You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/02 23:59:00 UTC

[GitHub] [pulsar] dlg99 opened a new pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly

dlg99 opened a new pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905


   ### Motivation
   
   - Kafka Connect Sink expects KC Data Struct instead of Avro or Json.
   - at some version of Kafka Connect, tasks got `preCommit()` method with default implementation that simply calls `flush()`. Some tasks override `preCommit()` directly, now KCA Sink supports it.
   
   ### Modifications
   
   - support for `preCommit()`
   - Avro and Json remapped into Kafka's Struct for the sinks to handle the data correctly
   - updated unit tests
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - updated tests in KafkaConnectSinkTest 
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
   No
   
   ### Documentation
     
   - [X] no-need-doc 
   
   no changes in API
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#discussion_r711558908



##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -257,6 +265,8 @@ private void recordSchemaTest(Object value, Schema schema, Object expectedKey, S
                 .failFunction(status::decrementAndGet)
                 .build();
 
+        org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(schema);

Review comment:
       This doesn't seem to be used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#issuecomment-923975021


   @congbobo184 are you happy now with this patch ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#discussion_r702302900



##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -390,7 +454,7 @@ public void unknownRecordSchemaTest() throws Exception {
     @Test
     public void KeyValueSchemaTest() throws Exception {
         KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
-        recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+        SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");

Review comment:
       Did you want to add some assertion around this unused variable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org