You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/17 03:30:17 UTC

[GitHub] [kafka] C0urante opened a new pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support

C0urante opened a new pull request #11773:
URL: https://github.com/apache/kafka/pull/11773


   Adds the new exactly-once-related source connector APIs described in [KIP-618].
   
   Note that these APIs are not used by the framework in this PR, just defined. They will be used in downstream PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#issuecomment-1056069349


   Thanks @tombentley!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r817294047



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear in a committed transaction.
+     * However, offsets for that transaction will still be committed. If the data should be reprocessed,
+     * the task should not invoke this method and should instead throw an exception.
+     */
+    void abortTransaction();
+
+    /**
+     * Requests a transaction abort after a source record is processed. The source record will be the
+     * last record in the aborted transaction. All of the records in that transaction will be discarded
+     * and will not appear in a committed transaction. However, offsets for that transaction will still
+     * be committed. If the data should be reprocessed, the task should not invoke this method and

Review comment:
       Ack, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r819434434



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Ultimately there's a cyclic dependency:
   
   1. We (or at least I) would like for `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` to be called with only with valid config parameters (implying that `Connector::validate` has been called first).
   2. But we'd like to call `Connector::validate` having first validated the `transaction.boundary` and `exactly.once.support`, (which implies calling `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries`).
   
   But in 2. do we really need to call `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` though, or it is enough to validate `transaction.boundary` and `exactly.once.support` are legal enum values (terminating early if they're not), and then call `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` after `Connector::validate`? We could add any 2nd phase `exactlyOnceSupport` and `canDefineTransactionBoundaries` errors to the `ConfigValues` from `validate`? That would then establish the pattern of per-config syntax validation happening before any inter-config validation (since typically a connector overriding `validate` would call `super`'s impl first thus achieving ConfigDef validation). I realise the KIP approved these signatures (so it's getting a bit late to change them), but I think that would allow `Connector::exactlyOnceSupport(Config)` and `Connector::canDefineTransactionBoundaries(Config)`, which makes the ordering much 
 more explicit purely from the types involved. Or am I asking the impossible (`AbstractHerder#validateConnectorConfig` is not the easiest piece of code to reason about)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r820917219



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       @tombentley 
   
   Definitely in favor of doing single-property validations before multi-property validations, and as far as single-property validations and short-circuiting go, the downstream [preflight validation PR](https://github.com/apache/kafka/pull/11776) already performs short-circuiting [if the `exactly.once.support` property is invalid](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L878) by skipping the call to `SourceConnector::exactlyOnceSupport`, and does the same for [the `transaction.boundary` property](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L925) and `SourceConnector::canDefineTransactionBoundaries` method as well.
   
   Can you clarify why you prefer to also short-circuit if any errors are reported by single-property or multi-property validation beyond ones reported for the `exactly.once.support` and `transaction.boundary` properties? It comes with a downside that we'd end up forcing two validation passes on users in order to discover any issues with the `exactly.once.support` and `transaction.boundary` properties, even if they could potentially be surfaced by the connector despite the issues with its config.
   
   Not sure passing in a [Config](https://github.com/apache/kafka/blob/3be978464ca01c29241954516b4d952f69d4e16d/clients/src/main/java/org/apache/kafka/common/config/Config.java) object would be the best way to address this, but it is better than nothing. It doesn't seem like a very ergonomic class to work with for follow-up validation attempts and is better suited for rendering final results to users (which, IMO, is part of the reason that the preflight validation logic in `AbstractHerder` is difficult to follow). Something like a `Map<String, ConfigValue>` might be more useful since I imagine most connector authors would end up either deriving one of their own from the `Config` object or using Java 8 streams logic to find the `ConfigValue` for a single property or subset of properties. But I'm also hesitant about this approach since I'm not sure how we'd want to handle properties that are present in the raw connector config, but which the connector doesn't define a `ConfigValue` for
  in the `Config` returned from `Connector::validate`. Maybe I'm overthinking this but I can't shake the feeling that either way we go on that front (either discard those properties and don't include them in `SourceConnector::exactlyOnceSupport` and `SourceConnector::canDefineTransactionBoundaries`, or include them with `ConfigValue` objects constructed by the framework and inserted into the resulting `Config` or `Map<String, ConfigValue>` object), we're going to end up introducing some footguns into the logic here that are just going to frustrate developers who want to implement these methods without having to read paragraphs of documentation or go through framework source code. One practical example of this is the `transaction.boundary` property--most connectors won't (and shouldn't) define this property themselves, so we'd have to decide if we'd want to provide that property to connectors in `SourceConnector::exactlyOnceSupport`, and if so, how.
   
   I think an ideal solution here might be opinionated but flexible: we can provide special accommodations for idiomatic usage patterns with the Connect API (like attaching special meaning to thrown `ConfigException` instances, like is already done during [`ConfigDef` config validation](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L592-L594), or how we [handle `RetriableException` instances specially for source tasks](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L306-L310)), but allow connector authors enough breathing room to try to surface as much information as possible in a single validation pass if they want to put in the extra work to make the UX for their connector as smooth as possible.
   
   From an API design perspective, I think adding a `Config` argument to the new methods here certainly accomplishes both of these; I'm just stuck on a few implementation details that I'm not sure we can overcome.
   
   @mimaison 
   
   Fair enough! There's definitely some potential for out-of-scope work here (like following up on KIP-419 or developing alternatives to), but at the very least we can count the newly-introduced APIs as in-scope and should provide these types of guarantees for them.
   
   **@ both**
   
   It may be easier to go over the details of the precise behavior we want here on the [preflight validation PR](https://github.com/apache/kafka/pull/11776). If we prefer a high-level discussion it's probably best to keep things here, but since we're getting fairly granular now, it might help to see what's been drafted so far implementation-wise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r818483442



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       It seems relevant to the implementer whether the configs have been validated or not. If they're not guaranteed by the contract to have been validated then the implementer might need to duplicate some of the validation logic in this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r812440623



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;

Review comment:
       This follows the behavior specified in the KIP, so changing it now would be a little difficult (although it's not out of the question as long as the change is warranted).
   
   However, I think this is better as-is. We default to `null` here in order to distinguish between connectors that might be able to provide exactly-once guarantees but just haven't implemented this method yet, and connectors that definitely cannot provide exactly-once guarantees. This comes in handy during preflight validation when a user has set `exactly.once.support` to `required` in their connector config; we can give users a more informative error message if the connector doesn't return `ExactlyOnceSupport.Supported` from this method.
   
   See the preflight validation logic in [DistributedHerder](https://github.com/C0urante/kafka/blob/fbd06b01ff168380341aaa56252a0a0f0556ae4c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L892-L899) that will be added in https://github.com/apache/kafka/pull/11776 for more detail.
   
   The reason that we can provide a default for `canDefineTransactionBoundaries` is that none of the existing connectors that are already built, published, and part of the Connect ecosystem have had a chance to implement the API necessary for connector-defined transaction boundaries yet, since that API hasn't been published in an official release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r816664173



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.

Review comment:
       "Connector authors" might be less ambiguous than "Developers".

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Should we be clearer as to which will be called first?

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear in a committed transaction.
+     * However, offsets for that transaction will still be committed. If the data should be reprocessed,
+     * the task should not invoke this method and should instead throw an exception.
+     */
+    void abortTransaction();
+
+    /**
+     * Requests a transaction abort after a source record is processed. The source record will be the
+     * last record in the aborted transaction. All of the records in that transaction will be discarded
+     * and will not appear in a committed transaction. However, offsets for that transaction will still
+     * be committed. If the data should be reprocessed, the task should not invoke this method and

Review comment:
       Similar recording?

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear in a committed transaction.
+     * However, offsets for that transaction will still be committed. If the data should be reprocessed,

Review comment:
       ```suggestion
        * However, offsets for that transaction will still be committed so than the records in that transaction are not reprocessed. If the data should instead be reprocessed,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824001349



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Potentially controversial idea: Have you considered always calling `start()` first? That way the connector should have computed its configuration and should be able to easily handle `exactlyOnceSupport()` and `canDefineTransactionBoundaries()`. Obviously `start()` may cause the connector to do some work before the runtime stops it in case of an invalid configuration. But since the proposed javadoc hinted it could be called first anyway, it's not making first worst
   
   The configuration validation logic is getting more and more complex and showing the limits of the current APIs. I wonder if having a `configure(Map)` method would help us. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r818829273



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Hmmmm... I think this is valid and could be useful but there are a few complications.
   
   If we want to guarantee that the config has been validated first (via `Connector::validate` and possibly with other checks like making sure that the value for `transaction.boundary` is valid), should we then only invoke this method if there are no errors in the connector config?
   
   This might make life easier for connector authors who want to be able to instantiate an `AbstractConfig` subclass for their connector and then make decisions about its exactly-once viability based on the already-parsed values from that config class instead of, like you say, having to duplicate that validation logic in this method.
   
   On the other hand, it'll prevent issues with exactly-once support from surfacing the first time around and will require a follow-up validation attempt to discover any, even if the problems with the connector config don't impact its ability to provide exactly-once guarantees.
   
   Thinking about just this specific use case (instantiating an `AbstractConfig` subclass inside this method), I think it'd be best to do two things:
   1. Add a `catch` clause specifically for `ConfigException` instances around the calls to `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` that gets translated into a specific error message stating that the connector config appears to be invalid and exactly-once support for the connector cannot be determined (instead of the [existing "An unexpected error occurred..." message](https://github.com/C0urante/kafka/blob/c687a12af3ec6838140bedf17efae4e9e5c19df1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L907)). This should allow connector authors to instantiate their specific `AbstractConfig` subclass within this method and not have to worry about running into something like [KAFKA-13327](https://issues.apache.org/jira/browse/KAFKA-13327), which causes 500 errors to be thrown during some connector config validations when it's possible (and a much better UX) to add an error message to the validated config
 .
   2. Add a `try`/`catch` block around the [parsing logic for TransactionBoundaries](https://github.com/C0urante/kafka/blob/69d63eda9c96bc76ce29b07c43f0e92c776fa5fb/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L67) that translates an `IllegalArgumentException` into a `ConfigException`, so that connector authors can freely invoke `TransactionBoundaries::fromProperty` on their connector config inside `SourceConnector::exactlyOnceSupport` and, if there's an issue, it'll be translated into a helpful error message to the user. And if the connector transaction boundary style isn't relevant at all to its exactly-once support, we won't double-ding the connector config with two error messages related to the `transaction.boundary` property.
   
   Do you think that this is a reasonable way to handle that specific connector development style? And are there other development styles that we should aim to accommodate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Oooh, that's an interesting idea.
   
   I think your point about the connector doing extra work is my biggest concern on that front, and it seems to be validated (heh) by how `validate` is designed (we never actually invoke `validate` on the same `Connector` instances that we invoke `start` on at the moment). It may not be very frequent, but there certainly are some connectors out there that do some heavy lifting in `start` that we wouldn't want to do every time during preflight validation. For a practical example, see MirrorMaker2: https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128
   
   Adding a `configure` method would help with this issue by allowing us to give a long-lived config to a `Connector` that the connector can hold onto across successive invocations of `exactlyOnceSupport` and `canDefineTransactionBoundaries` (and possibly even `validate`). But, it would also complicate some of the existing preflight validation logic in the framework. Right now, we only create a single `Connector` instance per connector type per worker for preflight validation, and all invocations of `validate` and `config` are performed with that instance (see `AbstractHerder` [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431), [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666), and [here](https://github.com/apache/kafka/blob/38e3787d760
 fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)).
   
   Since we use these validation-only `Connector` instances pretty loosely and without any guarantees about order of invocations for `config` and `validate`, it's unlikely that there are connectors out there that store state in either of these methods, so there could be relatively low risk for creating a new validation-only `Connector` instance every time a config has to be validated.
   
   But I have to wonder if we're overthinking things? It seems like we're trying to optimize away from methods that accept a raw config map and instead only provide that config once per round of validation. Is the objective there to create a smoother/better-documented/more-strictly-defined API? Improve resource utilization by obviating the need of connector developers to construct `AbstractConfig` subclasses (or equivalents) which parse and validate individual properties at instantiation time?
   
   As an aside--I think the language in the Javadoc may need to be revisited since, as you note, it implies that these new methods will be invoked before `start`, which is not necessarily the case (for example, in the current preflight validation PR, use of these methods is mutually exclusive with the `start` method for a given `Connector` instance). **TODO: Rework Javadoc once exact behavior is agreed upon.**




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r820917219



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       @tombentley 
   
   Definitely in favor of doing single-property validations before multi-property validations, and as far as single-property validations and short-circuiting go, the downstream [preflight validation PR](https://github.com/apache/kafka/pull/11776) already performs short-circuiting [if the `exactly.once.support` property is invalid](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L878) by skipping the call to `SourceConnector::exactlyOnceSupport`, and does the same for [the `transaction.boundary` property](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L925) and `SourceConnector::canDefineTransactionBoundaries` method as well.
   
   Can you clarify why you prefer to also short-circuit if any errors are reported by single-property or multi-property validation beyond ones reported for the `exactly.once.support` and `transaction.boundary` properties? It comes with a downside that we'd end up forcing two validation passes on users in order to discover any issues with the `exactly.once.support` and `transaction.boundary` properties, even if they could potentially be surfaced by the connector despite the issues with its config.
   
   Not sure passing in a [Config](https://github.com/apache/kafka/blob/3be978464ca01c29241954516b4d952f69d4e16d/clients/src/main/java/org/apache/kafka/common/config/Config.java) object would be the best way to address this, but it is better than nothing. It doesn't seem like a very ergonomic class to work with for follow-up validation attempts and is better suited for rendering final results to users (which, IMO, is part of the reason that the preflight validation logic in `AbstractHerder` is difficult to follow). Something like a `Map<String, ConfigValue>` might be more useful since I imagine most connector authors would end up either deriving one of their own from the `Config` object or using Java 8 streams logic to find the `ConfigValue` for a single property or subset of properties. But I'm also hesitant about this approach since I'm not sure how we'd want to handle properties that are present in the raw connector config, but which the connector doesn't define a `ConfigValue` for
  in the `Config` returned from `Connector::validate`. Maybe I'm overthinking this but I can't shake the feeling that either way we go on that front (either discard those properties and don't include them in `SourceConnector::exactlyOnceSupport` and `SourceConnector::canDefineTransactionBoundaries`, or include them with `ConfigValue` objects constructed by the framework and inserted into the resulting `Config` or `Map<String, ConfigValue>` object), we're going to end up introducing some footguns into the logic here that are just going to frustrate developers who want to implement these methods without having to read paragraphs of documentation or go through framework source code. One practical example of this is the `transaction.boundary` property--most connectors won't (and shouldn't) define this property themselves, so we'd have to decide if we'd want to provide that property to connectors in `SourceConnector::exactlyOnceSupport`, and if so, how.
   
   I think an ideal solution here might be opinionated but flexible: we can provide special accommodations for idiomatic usage patterns with the Connect API (like attaching special meaning to thrown `ConfigException` instances, like is already done during [`ConfigDef` config validation](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L592-L594), or how we [handle `RetriableException` instances specially for source tasks](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L306-L310)), but allow connector authors enough breathing room to try to surface as much information as possible in a single validation pass if they want to put in the extra work to make the UX for their connector as smooth as possible.
   
   From an API design perspective, I think adding a `Config` argument to the new methods here certainly accomplishes both of these; I'm just stuck on a few implementation details that I'm not sure we can overcome.
   
   On this front:
   
   > Or am I asking the impossible (AbstractHerder#validateConnectorConfig is not the easiest piece of code to reason about)?
   
   One thing that may be useful to point out (forgive me if you're already aware) is that it's expected that `Connector::validate` will report errors without throwing exceptions, which means that there's no implicit contract that the validation control flow gets short-circuited right now if there are any errors in the connector config. This means that we can also validate, for example, overridden Kafka client properties and whether they are permitted by the `ConnectorClientConfigOverridePolicy` set up on the worker, even if there are other errors in the connector config.
   
   @mimaison 
   
   Fair enough! There's definitely some potential for out-of-scope work here (like following up on KIP-419 or developing alternatives to it), but at the very least we can count the newly-introduced APIs as in-scope and should provide these types of guarantees for them.
   
   **@ both**
   
   It may be easier to go over the details of the precise behavior we want here on the [preflight validation PR](https://github.com/apache/kafka/pull/11776). If we prefer a high-level discussion it's probably best to keep things here, but since we're getting fairly granular now, it might help to see what's been drafted so far implementation-wise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r817293215



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       All else equal, I think we may want to err on the side of being a little too broad here rather than being too granular and painting ourselves into a corner in case we need to change the order of operations later.
   
   That said, do you have a case in mind where this might matter to a connector author? If there's one ordering that might be more intuitive or easier to implement then I wouldn't be opposed to sticking to it, as long as the rationale applies broadly enough that we don't think we'll need to change things later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r820917219



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       @tombentley 
   
   Definitely in favor of doing single-property validations before multi-property validations, and as far as single-property validations and short-circuiting go, the downstream [preflight validation PR](https://github.com/apache/kafka/pull/11776) already performs short-circuiting [if the `exactly.once.support` property is invalid](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L878) by skipping the call to `SourceConnector::exactlyOnceSupport`, and does the same for [the `transaction.boundary` property](https://github.com/C0urante/kafka/blob/b4976e82aec1c864bd25d660b4042a71e03b7c47/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L925) and `SourceConnector::canDefineTransactionBoundaries` method as well.
   
   Can you clarify why you prefer to also short-circuit if any errors are reported by single-property or multi-property validation beyond ones reported for the `exactly.once.support` and `transaction.boundary` properties? It comes with a downside that we'd end up forcing two validation passes on users in order to discover any issues with the `exactly.once.support` and `transaction.boundary` properties, even if they could potentially be surfaced by the connector despite the issues with its config.
   
   Not sure passing in a [Config](https://github.com/apache/kafka/blob/3be978464ca01c29241954516b4d952f69d4e16d/clients/src/main/java/org/apache/kafka/common/config/Config.java) object would be the best way to address this, but it is better than nothing. It doesn't seem like a very ergonomic class to work with for follow-up validation attempts and is better suited for rendering final results to users (which, IMO, is part of the reason that the preflight validation logic in `AbstractHerder` is difficult to follow). Something like a `Map<String, ConfigValue>` might be more useful since I imagine most connector authors would end up either deriving one of their own from the `Config` object or using Java 8 streams logic to find the `ConfigValue` for a single property or subset of properties. But I'm also hesitant about this approach since I'm not sure how we'd want to handle properties that are present in the raw connector config, but which the connector doesn't define a `ConfigValue` for
  in the `Config` returned from `Connector::validate`. Maybe I'm overthinking this but I can't shake the feeling that either way we go on that front (either discard those properties and don't include them in `SourceConnector::exactlyOnceSupport` and `SourceConnector::canDefineTransactionBoundaries`, or include them with `ConfigValue` objects constructed by the framework and inserted into the resulting `Config` or `Map<String, ConfigValue>` object), we're going to end up introducing some footguns into the logic here that are just going to frustrate developers who want to implement these methods without having to read paragraphs of documentation or go through framework source code. One practical example of this is the `transaction.boundary` property--most connectors won't (and shouldn't) define this property themselves, so we'd have to decide if we'd want to provide that property to connectors in `SourceConnector::exactlyOnceSupport`, and if so, how.
   
   I think an ideal solution here might be opinionated but flexible: we can provide special accommodations for idiomatic usage patterns with the Connect API (like attaching special meaning to thrown `ConfigException` instances, like is already done during [`ConfigDef` config validation](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L592-L594), or how we [handle `RetriableException` instances specially for source tasks](https://github.com/apache/kafka/blob/bbb2dc54a0f45bc5455f22a0671adde206dcfa29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L306-L310)), but allow connector authors enough breathing room to try to surface as much information as possible in a single validation pass if they want to put in the extra work to make the UX for their connector as smooth as possible.
   
   From an API design perspective, I think adding a `Config` argument to the new methods here certainly accomplishes both of these; I'm just stuck on a few implementation details that I'm not sure we can overcome.
   
   On this front:
   
   > Or am I asking the impossible (AbstractHerder#validateConnectorConfig is not the easiest piece of code to reason about)?
   
   One thing that may be useful to point out (forgive me if you're already aware) is that it's expected that `Connector::validate` will report errors without throwing exceptions, which means that there's no implicit contract that the validation control flow gets short-circuited right now if there are any errors in the connector config. This means that we can also validate, for example, overridden Kafka client properties and whether they are permitted by the `ConnectorClientConfigOverridePolicy` set up on the worker, even if there are other errors in the connector config.
   
   @mimaison 
   
   Fair enough! There's definitely some potential for out-of-scope work here (like following up on KIP-419 or developing alternatives to), but at the very least we can count the newly-introduced APIs as in-scope and should provide these types of guarantees for them.
   
   **@ both**
   
   It may be easier to go over the details of the precise behavior we want here on the [preflight validation PR](https://github.com/apache/kafka/pull/11776). If we prefer a high-level discussion it's probably best to keep things here, but since we're getting fairly granular now, it might help to see what's been drafted so far implementation-wise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r812440623



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;

Review comment:
       This follows the behavior specified in the KIP, so changing it now would be a little difficult (although it's not out of the question as long as the change is warranted).
   
   However, I think this is better as-is. We default to `null` here in order to distinguish between connectors that might be able to provide exactly-once guarantees but just haven't implemented this method yet, and connectors that definitely cannot provide exactly-once guarantees. This comes in handy during preflight validation when a user has set `exactly.once.support ` to `required` in their connector config; we can give users a more informative error message if the connector doesn't return `ExactlyOnceSupport.Supported` from this method.
   
   See the preflight validation logic in [DistributedHerder](https://github.com/C0urante/kafka/blob/fbd06b01ff168380341aaa56252a0a0f0556ae4c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L892-L899) that will be added in https://github.com/apache/kafka/pull/11776 for more detail.
   
   The reason that we can provide a default for `canDefineTransactionBoundaries` is that none of the existing connectors that are already built, published, and part of the Connect ecosystem have had a chance to implement the API necessary for connector-defined transaction boundaries yet, since that API hasn't been published in an official release.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r816156264



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;

Review comment:
       @hgeraldino do you agree with the rationale here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
hgeraldino commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r816451067



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;

Review comment:
       Yeah it makes sense. Sorry for not getting back to you sooner 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r817294240



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear in a committed transaction.
+     * However, offsets for that transaction will still be committed. If the data should be reprocessed,

Review comment:
       Ah, this is nice, thanks. Was worried about making this part clear to people, that extra bit definitely helps.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r817294374



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.

Review comment:
       [:(](https://www.youtube.com/watch?v=SaVTHG-Ev4k)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r819951647



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of

Review comment:
       Ah yep, good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r824130593



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Oooh, that's an interesting idea.
   
   I think your point about the connector doing extra work is my biggest concern on that front, and it seems to be validated (heh) by how `validate` is designed (we never actually invoke `validate` on the same `Connector` instances that we invoke `start` on at the moment). It may not be very frequent, but there certainly are some connectors out there that do some heavy lifting in `start` that we wouldn't want to do every time during preflight validation. For a practical example, see MirrorMaker2: https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L120-L128
   
   Adding a `configure` method would help with this issue by allowing us to give a long-lived config to a `Connector` that the connector can hold onto across successive invocations of `exactlyOnceSupport` and `canDefineTransactionBoundaries` (and possibly even `validate`). But, it would also complicate some of the existing preflight validation logic in the framework. Right now, we only create a single `Connector` instance per connector type per worker for preflight validation, and all invocations of `validate` and `config` are performed with that instance (see `AbstractHerder` [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L431), [here](https://github.com/apache/kafka/blob/38e3787d760fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L664-L666), and [here](https://github.com/apache/kafka/blob/38e3787d760
 fba6bdac4c91126e87cd7939ae43f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L461-L479)).
   
   Since we use these validation-only `Connector` instances pretty loosely and without any guarantees about order of invocations for `config` and `validate`, it's unlikely that there are connectors out there that store state in either of these methods, so there could be relatively low risk for creating a new validation-only `Connector` instance every time a config has to be validated.
   
   But I have to wonder if we're overthinking things? It seems like we're trying to optimize away from methods that accept a raw config map and instead only provide that config once per round of validation. Is the objective there to create a smoother/better-documented/more-strictly-defined API? Improve resource utilization by obviating the need of connector developers to construct `AbstractConfig` subclasses (or equivalents) which parse and validate individual properties at instantiation time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r818829273



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Hmmmm... I think this is valid and could be useful but there are a few complications.
   
   If we want to guarantee that the config has been validated first (via `Connector::validate` and possibly with other checks like making sure that the value for `transaction.boundary` is valid), should we then only invoke this method if there are no errors in the connector config?
   
   This might make life easier for connector authors who want to be able to instantiate an `AbstractConfig` subclass for their connector and then make decisions about its exactly-once viability based on the already-parsed values from that config class instead of, like you say, having to duplicate that validation logic in this method.
   
   On the other hand, it'll prevent issues with exactly-once support from surfacing the first time around and will require a follow-up validation attempt to discover any, even if the problems with the connector config don't impact its ability to provide exactly-once guarantees.
   
   Thinking about just this specific use case (instantiating an `AbstractConfig` subclass inside this method), I think it'd be best to do two things:
   1. Add a `catch` clause specifically for `ConfigException` instances around the calls to `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` that gets translated into a specific error message stating that the connector config appears to be invalid and exactly-once support for the connector cannot be determined (instead of the [existing "An unexpected error occurred..." message](https://github.com/C0urante/kafka/blob/c687a12af3ec6838140bedf17efae4e9e5c19df1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L907)).
   2. Add a `try`/`catch` block around the [parsing logic for ConnectorTransactionBoundaries](https://github.com/C0urante/kafka/blob/69d63eda9c96bc76ce29b07c43f0e92c776fa5fb/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L67) that translates an `IllegalArgumentException` into a `ConfigException`, so that connector authors can freely invoke `ConnectorTransactionBoundaries::fromProperty` on their connector config inside `SourceConnector::exactlyOnceSupport` and, if there's an issue, it'll be translated into a helpful error message to the user. And if the connector transaction boundary style isn't relevant at all to its exactly-once support, we won't double-ding the connector config with two error messages related to the `transaction.boundary` property.
   
   Do you think that this is a reasonable way to handle that specific connector development style? And are there other development styles that we should aim to accommodate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
hgeraldino commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r812427100



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.2
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;

Review comment:
       Wouldn't it make sense to default this to ExactlyOnceSupport#UNSUPPORTED (like `canDefineTransactionBoundaries` is)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r818829273



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       Hmmmm... I think this is valid and could be useful but there are a few complications.
   
   If we want to guarantee that the config has been validated first (via `Connector::validate` and possibly with other checks like making sure that the value for `transaction.boundary` is valid), should we then only invoke this method if there are no errors in the connector config?
   
   This might make life easier for connector authors who want to be able to instantiate an `AbstractConfig` subclass for their connector and then make decisions about its exactly-once viability based on the already-parsed values from that config class instead of, like you say, having to duplicate that validation logic in this method.
   
   On the other hand, it'll prevent issues with exactly-once support from surfacing the first time around and will require a follow-up validation attempt to discover any, even if the problems with the connector config don't impact its ability to provide exactly-once guarantees.
   
   Thinking about just this specific use case (instantiating an `AbstractConfig` subclass inside this method), I think it'd be best to do two things:
   1. Add a `catch` clause specifically for `ConfigException` instances around the calls to `Connector::exactlyOnceSupport` and `Connector::canDefineTransactionBoundaries` that gets translated into a specific error message stating that the connector config appears to be invalid and exactly-once support for the connector cannot be determined (instead of the [existing "An unexpected error occurred..." message](https://github.com/C0urante/kafka/blob/c687a12af3ec6838140bedf17efae4e9e5c19df1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L907)). This should allow connector authors to instantiate their specific `AbstractConfig` subclass within this method and not have to worry about running into something like [KAFKA-13327](https://issues.apache.org/jira/browse/KAFKA-13327), which causes 500 errors to be thrown during some connector config validations when it's possible (and a much better UX) to add an error message to the validated config
 .
   2. Add a `try`/`catch` block around the [parsing logic for ConnectorTransactionBoundaries](https://github.com/C0urante/kafka/blob/69d63eda9c96bc76ce29b07c43f0e92c776fa5fb/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L67) that translates an `IllegalArgumentException` into a `ConfigException`, so that connector authors can freely invoke `ConnectorTransactionBoundaries::fromProperty` on their connector config inside `SourceConnector::exactlyOnceSupport` and, if there's an issue, it'll be translated into a helpful error message to the user. And if the connector transaction boundary style isn't relevant at all to its exactly-once support, we won't double-ding the connector config with two error messages related to the `transaction.boundary` property.
   
   Do you think that this is a reasonable way to handle that specific connector development style? And are there other development styles that we should aim to accommodate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r819945019



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -16,17 +16,63 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * The configuration key that determines how source tasks will define transaction boundaries
+     * when exactly-once support is enabled.
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";

Review comment:
       I wanted to define the `TransactionBoundary` enum somewhere public (i.e., accessible by connector authors) so that they could use it for things like the `SourceConnector::exactlyOnceSupport` method in cases where connectors need to be able to define their own transaction boundaries in order to provide exactly-once support. Seems more ergonomic to be able to do something like `TransactionBoundary.fromProperty(props.get(TRANSACTION_BOUNDARY_CONFIG))` in that case.
   
   Given that, it seemed like a toss-up between `SourceConnector` and `SourceTask`. The former felt a little more intuitive (tasks probably have less reason to need to know about their transaction boundary style beyond the return value of `SourceTaskContext::transactionContext`), but the latter lines up better with the precedent set by the topics-related properties for sink connectors, which are [defined in the `SinkTask` class](https://github.com/apache/kafka/blob/3be978464ca01c29241954516b4d952f69d4e16d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L56-L70).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11773: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618)

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11773:
URL: https://github.com/apache/kafka/pull/11773#discussion_r819517322



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of

Review comment:
       Should it be `Requests a transaction abort after the next batch of records`?

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -16,17 +16,63 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * The configuration key that determines how source tasks will define transaction boundaries
+     * when exactly-once support is enabled.
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";

Review comment:
       Is there a reason this config is defined here? Is it just temporary? (I've not looked the the remaining PRs yet)

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,46 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.

Review comment:
       To be honest, I'd be in favor in having stronger guarantees on the order of method calls and on the overall lifecycle of plugins. At the moment, connectors can't assume very much and we get weird behaviors like described in [KIP-419](https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped). At least here, I like that you were explicit in the Javadoc!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org