You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/15 23:42:59 UTC

[GitHub] [pulsar] sijie opened a new pull request #9590: [WIP] Support writing general records to Pulsar sink

sijie opened a new pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-779513363


   The unit tests are added. Integration tests to be added.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-793731449


   @sijie do you have time to complete this patch.
   
   It is very useful for a couple of usecases I saw and I really would like this work to land to master
   the patch has already been approved by @codelipenghui and @freeznet  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zymap commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
zymap commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-784057582


   Failed tests:
   
   ```
   ~~~~~~~ SKIPPED -- [TestClass name=class org.apache.pulsar.tests.integration.io.GenericRecordSourceTest].testGenericRecordSource([])-------------- Starting test [TestClass name=class org.apache.pulsar.tests.integration.io.GenericRecordSourceTest].testGenericRecordSource([])-------
   07:19:45.283 [TestNG-method=testGenericRecordSource-1:org.apache.pulsar.tests.integration.io.GenericRecordSourceTest@112] INFO  org.apache.pulsar.tests.integration.io.GenericRecordSourceTest - Run command : /pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource
   07:19:45.288 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@216] INFO  org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): Executing...
   07:19:47.833 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@221] INFO  org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): STDERR: Source class org.apache.pulsar.tests.integration.io.GenericRecordSource must be in class path
   07:19:47.834 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@221] INFO  org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): STDERR: Reason: Source class org.apache.pulsar.tests.integration.io.GenericRecordSource must be in class path
   07:19:48.163 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@236] INFO  org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): Done
   07:19:48.167 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@254] INFO  org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): completed with 1
   07:19:48.167 [docker-java-stream-57079303:org.apache.pulsar.tests.integration.utils.DockerUtils$2@257] ERROR org.apache.pulsar.tests.integration.utils.DockerUtils - DOCKER.exec(ckmqvcaq-standalone:/pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource): completed with non zero return code: 1
   stdout: 
   stderr: Source class org.apache.pulsar.tests.integration.io.GenericRecordSource must be in class path
   
   Reason: Source class org.apache.pulsar.tests.integration.io.GenericRecordSource must be in class path
   
   !!!!!!!!! FAILURE-- [TestClass name=class org.apache.pulsar.tests.integration.io.GenericRecordSourceTest].testGenericRecordSource([])-------
   Error:  Tests run: 5, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 206.535 s <<< FAILURE! - in TestSuite
   Error:  pulsar-standalone-suite(org.apache.pulsar.tests.integration.io.GenericRecordSourceTest)  Time elapsed: 2.904 s  <<< FAILURE!
   org.apache.pulsar.tests.integration.docker.ContainerExecException: /pulsar/bin/pulsar-admin sources create --name test-state-source-isolqgjy --destinationTopicName test-state-source-output-zrbylspk --archive /pulsar/examples/java-test-functions.jar --classname org.apache.pulsar.tests.integration.io.GenericRecordSource failed on 267d5fc4c5f0ad1512ae6ba8588f32236a6d72ddd840ba940a24d9d2a94de872 with error code 1
   	at org.apache.pulsar.tests.integration.utils.DockerUtils$2.onComplete(DockerUtils.java:259)
   	at org.testcontainers.shaded.com.github.dockerjava.core.exec.AbstrAsyncDockerCmdExec$1.onComplete(AbstrAsyncDockerCmdExec.java:51)
   	at org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder.lambda$executeAndStream$1(DefaultInvocationBuilder.java:276)
   	at java.lang.Thread.run(Thread.java:748)
   
   [INFO] 
   [INFO] Results:
   [INFO] 
   Error:  Failures: 
   Error:  org.apache.pulsar.tests.integration.io.GenericRecordSourceTest.pulsar-standalone-suite(org.apache.pulsar.tests.integration.io.GenericRecordSourceTest)
   [INFO]   Run 1: PASS
   Error:    Run 2: GenericRecordSourceTest.testGenericRecordSource ยป ContainerExec /pulsar/bin/pu...
   [INFO] 
   [INFO] 
   Error:  Tests run: 4, Failures: 1, Errors: 0, Skipped: 0
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-780172178


   @sijie
   
   I totally agree that the main point here is to prevent the PulsarSink from creating the Producer and forcing a Schema on the topic in case of `GenericRecord` type.
   So I am fine with this approach as well.
   
   if you are okay I can merge this patch in my branch at #9481 (and revert the changes to TopicSchema) as we already have integration tests and I can continue the work.
   But if you prefer I can close my PR and let you complete your patch, but please add an integration test like my one (that basically covers my usecase).
   
   I just want to see this feature land to master branch and make it available to our users.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] freeznet commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
freeznet commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-794765433


   > @sijie do you have time to complete this patch.
   > 
   > It is very useful for a couple of usecases I saw and I really would like this work to land to master
   > the patch has already been approved by @codelipenghui and @freeznet
   
   @eolivelli i am work with @sijie to fix the ci failed issue, will update and finish this pr soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-780263213


   I will complete the integration tests here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r577022682



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {

Review comment:
       @eolivelli Yes and no on my original comment. 
   
   My original comment is to make sure we returned the write schema information via TopicSchema. Because we are using `AUTO_CONSUME` in the PulsarSink to indicate `GenericRecord` are published to the Pulsar topic. `AUTO_CONSUME`  can be used by both source and sink. In order to not impact sources, I didn't add the logic in `TopicSchema`. Instead, I add it in PulsarSink to make it more explicit, which results in one line of similar change as your initial change. But it doesn't your original and current implementation is in the right direction.
   
   The main problem of your previous and current implementation on #9481 is you are trying to hijack the existing AVRO implementation to introduce the support of lazy schema initialization. The lazy schema initialization is already implemented as part of multi-schema write support. So you don't need to add such a hack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#issuecomment-783861958


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r596069056



##########
File path: tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A source that generates {@link GenericRecord}s.
+ */
+@Slf4j
+public class GenericRecordSource implements Source<GenericRecord> {
+
+    private RecordSchemaBuilder recordSchemaBuilder;
+    private GenericSchema<GenericRecord> schema;
+    private List<Field> fields;
+    private AtomicInteger count = new AtomicInteger();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        this.recordSchemaBuilder = SchemaBuilder.record("MyBean");
+        this.recordSchemaBuilder.field("number").type(SchemaType.INT32);
+        this.recordSchemaBuilder.field("text").type(SchemaType.STRING);
+        schema = Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO));
+        fields = Arrays.asList(new Field("number", 0),
+            new Field("text", 1));
+        log.info("created source, schema {}", new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Record<GenericRecord> read() throws Exception {
+        // slow down the production of values
+        Thread.sleep(20);
+
+        int value = count.incrementAndGet();
+        GenericRecord record = schema.newRecordBuilder()

Review comment:
       @sijie  okay
   I will check downstream if using RecordSchemaBuilder is a valid option.
   thanks
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r593849914



##########
File path: tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A source that generates {@link GenericRecord}s.
+ */
+@Slf4j
+public class GenericRecordSource implements Source<GenericRecord> {
+
+    private RecordSchemaBuilder recordSchemaBuilder;
+    private GenericSchema<GenericRecord> schema;
+    private List<Field> fields;
+    private AtomicInteger count = new AtomicInteger();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        this.recordSchemaBuilder = SchemaBuilder.record("MyBean");
+        this.recordSchemaBuilder.field("number").type(SchemaType.INT32);
+        this.recordSchemaBuilder.field("text").type(SchemaType.STRING);
+        schema = Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO));
+        fields = Arrays.asList(new Field("number", 0),
+            new Field("text", 1));
+        log.info("created source, schema {}", new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Record<GenericRecord> read() throws Exception {
+        // slow down the production of values
+        Thread.sleep(20);
+
+        int value = count.incrementAndGet();
+        GenericRecord record = schema.newRecordBuilder()

Review comment:
       @sijie @codelipenghui unfortunately this is not exactly like my original test case, that reproduced my use case.
   That is to be able to push an object that implements GenericRecord.
   
   Here you are using the builder provided by Pulsar but this is mo enough for me, because my user would like to use an object from his own domain, just by implementing Pulsar GenericRecord java interface, because we will save resources (allocations and cycles)
   
   
   https://github.com/apache/pulsar/pull/9481/files#diff-bbdf586ddad181a0a9dae17974b19ca5cbbce398716ec7fa5b4c45b69be58f41R66




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r576752538



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {

Review comment:
       @sijie initially you pointed out that working here in PulsarSink is not the right way, but we should only work on TopicSchema
   https://github.com/apache/pulsar/pull/9481#discussion_r570723440
   
   In fact I believe that in my PR #9481 I took the right way, driven by your suggestions.
   
   I believe that this change is not enough in order to support by needs.
    
    BTW if the integration test I added to #9481 works with this patch then we can converge to a good  solution.
    My goal is to get that usecase work, in the best way for the project for the mid/long term




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r576525779



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {
+                consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+                SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
+                if (SchemaType.AUTO_CONSUME != configuredSchemaType) {

Review comment:
       No. The schema type is already overwritten in line 419. This is just to log an info message to indicate that the schema type has been overwritten to `AUTO_CONSUME`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r594798991



##########
File path: tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/GenericRecordSource.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A source that generates {@link GenericRecord}s.
+ */
+@Slf4j
+public class GenericRecordSource implements Source<GenericRecord> {
+
+    private RecordSchemaBuilder recordSchemaBuilder;
+    private GenericSchema<GenericRecord> schema;
+    private List<Field> fields;
+    private AtomicInteger count = new AtomicInteger();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        this.recordSchemaBuilder = SchemaBuilder.record("MyBean");
+        this.recordSchemaBuilder.field("number").type(SchemaType.INT32);
+        this.recordSchemaBuilder.field("text").type(SchemaType.STRING);
+        schema = Schema.generic(this.recordSchemaBuilder.build(SchemaType.AVRO));
+        fields = Arrays.asList(new Field("number", 0),
+            new Field("text", 1));
+        log.info("created source, schema {}", new String(schema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Record<GenericRecord> read() throws Exception {
+        // slow down the production of values
+        Thread.sleep(20);
+
+        int value = count.incrementAndGet();
+        GenericRecord record = schema.newRecordBuilder()

Review comment:
       GenericRecord was designed to be from RecordSchemaBuilder. It doesn't expect people to implement the GenericRecord directly. I don't understand the allocations and cycles issue. If it is an allocations or cycles issue, it should be fixed in RecordSchemaBuilder. It shouldn't be done by just implementing `GenericRecord`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #9590: Support writing general records to Pulsar sink

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #9590:
URL: https://github.com/apache/pulsar/pull/9590#discussion_r576525196



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -400,7 +415,16 @@ public void close() throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig();
         consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
-            consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
+            if (GenericRecord.class.isAssignableFrom(typeArg)) {
+                consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
+                SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
+                if (SchemaType.AUTO_CONSUME != configuredSchemaType) {

Review comment:
       Need to add `consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());` in this branch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org