You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/02 23:59:00 UTC
[GitHub] [pulsar] dlg99 opened a new pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly
dlg99 opened a new pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905
### Motivation
- Kafka Connect Sink expects KC Data Struct instead of Avro or Json.
- at some version of Kafka Connect, tasks got `preCommit()` method with default implementation that simply calls `flush()`. Some tasks override `preCommit()` directly, now KCA Sink supports it.
### Modifications
- support for `preCommit()`
- Avro and Json remapped into Kafka's Struct for the sinks to handle the data correctly
- updated unit tests
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- updated tests in KafkaConnectSinkTest
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
No
### Documentation
- [X] no-need-doc
no changes in API
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 commented on a change in pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly
Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#discussion_r711558908
##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -257,6 +265,8 @@ private void recordSchemaTest(Object value, Schema schema, Object expectedKey, S
.failFunction(status::decrementAndGet)
.build();
+ org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(schema);
Review comment:
This doesn't seem to be used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli commented on pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly
Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#issuecomment-923975021
@congbobo184 are you happy now with this patch ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli commented on a change in pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly
Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905#discussion_r702302900
##########
File path: pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
##########
@@ -390,7 +454,7 @@ public void unknownRecordSchemaTest() throws Exception {
@Test
public void KeyValueSchemaTest() throws Exception {
KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
- recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+ SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
Review comment:
Did you want to add some assertion around this unused variable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli merged pull request #11905: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly
Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #11905:
URL: https://github.com/apache/pulsar/pull/11905
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org