You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/28 08:19:42 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

yashmayya opened a new pull request, #12450:
URL: https://github.com/apache/kafka/pull/12450

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   - https://issues.apache.org/jira/browse/KAFKA-13809 : The FileStream connectors don't propagate all connector configs to their tasks.
   - https://issues.apache.org/jira/browse/KAFKA-9228 : client overrides, converter configs, SMT configs may not be propagated to tasks on connector config updates. This isn't an issue for most connectors because connector configs are propagated to the tasks in the connector implementations. This isn't the case for the FileStream connectors, however. 
   - This PR updates the FileStream connectors to be in-line with how most other connectors generate task configs (which is a good thing because these connectors are intended to be example connector implementations), and also works around the bug from https://issues.apache.org/jira/browse/KAFKA-9228
   - Minor: Also update the inaccurate source and sink connector Javadocs to align with the [source](https://github.com/apache/kafka/blob/0c5f5a7f8b3628e991459ba9cff414c675676b8b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java#L39-L41) and [sink](https://github.com/apache/kafka/blob/0c5f5a7f8b3628e991459ba9cff414c675676b8b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java#L36-L38) task Javadocs
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   - Manually tested the sink and source connectors and ensured that updates to SMT configs are reflected without manually restarting the tasks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r938025993


##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    this.props = props;

Review Comment:
   I feel like that should be sufficient because most connector implementations probably wouldn't have any fancy logic in the `start` and `taskConfigs` methods that needs to be demonstrated with these example connectors. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941408595


##########
docs/connect.html:
##########
@@ -440,19 +443,17 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
+    // This method is where connectors provide the task configs for the tasks that are to be created for this connector.
+    // The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is
+    // only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks).
+    // Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance,
+    // if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream).

Review Comment:
   Ah, fair point. I've trimmed it down to couple of sentences. 



##########
docs/connect.html:
##########
@@ -440,19 +443,17 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
+    // This method is where connectors provide the task configs for the tasks that are to be created for this connector.
+    // The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is
+    // only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks).
+    // Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance,
+    // if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream).

Review Comment:
   Ah, fair point. I've trimmed it down to a couple of sentences. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12450:
URL: https://github.com/apache/kafka/pull/12450


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r936225081


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Why reduce the visibility of these?



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();

Review Comment:
   Can we remove this line now?



##########
docs/connect.html:
##########
@@ -466,23 +460,26 @@ <h5><a id="connect_taskexample" href="#connect_taskexample">Task Example - Sourc
 
     <pre class="brush: java;">
 public class FileStreamSourceTask extends SourceTask {
-    String filename;
-    InputStream stream;
-    String topic;
+    private String filename;
+    private InputStream stream;
+    private String topic;
+    private int batchSize;
 
     @Override
     public void start(Map&lt;String, String&gt; props) {
-        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);

Review Comment:
   I think it'd be acceptable to leave this section as-was since the addition of the `AbstractConfig` class and `CONFIG_DEF` field introduces a bit too much new info into the mix that's not directly related to teaching beginners about the `start`/`stop`/`poll` methods.
   
   We also call out that "We'll use pseudo-code to describe most of the implementation, but you can refer to the source code for the full example" for the source task section, so it's less important that we keep the code for the connector and the code in these examples strictly in-sync here.



##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    this.props = props;

Review Comment:
   This feels a bit too much like magic boilerplate now. I imagine that if I were reading these docs for the first time to get up to speed with the connector API, it wouldn't really help me understand what the purposes of the `start` and `taskConfigs` methods are.
   
   I've noticed recently that we call out in the source task section that the code snippets may differ from the actual source code for the task. I'd be tempted to leave this as-is and add a similar disclaimer for this section, except it'd run the risk that users might use the same style, which would leave their connectors susceptible to KAFKA-9228.
   
   Can you think of a way to keep the source code changes here (i.e., storing `props` instead of just `filename` and `topic`), but also give some idea to users about what they can/should be doing in `start`?
   
   Some rejected alternatives I've thought of include:
   - Validating that the file exists (bad practice, since in most cases this should either be done in a [custom config validator](https://kafka.apache.org/32/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html) or on task startup)
   - Waiting for the file to exist, then [requesting a task reconfiguration](https://kafka.apache.org/32/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()) (too complicated and relies on concepts that should be introduced later on)
   - Logging the config props (bad practice, can reveal secrets in plaintext, should never be done)
   
   The best I can think of is maybe just adding a log line saying "Starting file source connector that will read from &lt;filename&gt;", but that feels a little too contrived... any thoughts?



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();
-    if (filename != null)
-        config.put(FILE_CONFIG, filename);
-    config.put(TOPIC_CONFIG, topic);
-    configs.add(config);
+    configs.add(props);
     return configs;
 }</pre>
 

Review Comment:
   While we're here, can we move the paragraph below starting with "Although not used in this example, `SourceTask` also provides two APIs to commit offsets" into the source task section, after the paragraph starting with "Note that this implementation uses the normal Java `InputStream` interface"? It describes the behavior of the task instead of the connector, really should not be in this section.



##########
docs/connect.html:
##########
@@ -403,12 +403,11 @@ <h4><a id="connect_developing" href="#connect_developing">Developing a Simple Co
 
     <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector Example</a></h5>
 
-    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):</p>
+    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size):</p>
 
     <pre class="brush: java;">
 public class FileStreamSourceConnector extends SourceConnector {
-    private String filename;
-    private String topic;</pre>
+    private Map&lt;String, String&gt;;</pre>

Review Comment:
   Missing variable name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on PR #12450:
URL: https://github.com/apache/kafka/pull/12450#issuecomment-1197822958

   @C0urante could you please take a look at this small change whenever you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940599072


##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();
-    if (filename != null)
-        config.put(FILE_CONFIG, filename);
-    config.put(TOPIC_CONFIG, topic);
-    configs.add(config);
+    configs.add(props);
     return configs;
 }</pre>
 

Review Comment:
   🙏 thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940601003


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java:
##########
@@ -25,21 +24,19 @@
 import org.apache.kafka.connect.sink.SinkConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple sink connector that works with stdout or a file.
  */
 public class FileStreamSinkConnector extends SinkConnector {
 
-    public static final String FILE_CONFIG = "file";
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final String FILE_CONFIG = "file";
+    static final ConfigDef CONFIG_DEF = new ConfigDef()

Review Comment:
   (These should be reverted too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940880586


##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java:
##########
@@ -74,16 +73,25 @@ public void testSinkTasks() {
 
     @Test
     public void testSinkTasksStdout() {
-        sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        sinkProperties.remove(FileStreamSinkConnector.FILE_CONFIG);
         connector.start(sinkProperties);
         List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
-        assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+        assertNull(taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
     }
 
     @Test
     public void testTaskClass() {
         connector.start(sinkProperties);
         assertEquals(FileStreamSinkTask.class, connector.taskClass());
     }
+
+    @Test
+    public void testConnectorConfigsPropagateToTaskConfigs() {

Review Comment:
   Sure, makes sense, I've added comments in the source/sink tests for this. I don't feel like it's really necessary to add it in the connectors' `start` methods since we aren't really doing anything convoluted there as such that needs explaining. However, I'd be okay with adding it there as well if you feel strongly about it.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Hm alright, I guess I hadn't considered that - I've reverted these changes. 



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java:
##########
@@ -25,21 +24,19 @@
 import org.apache.kafka.connect.sink.SinkConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple sink connector that works with stdout or a file.
  */
 public class FileStreamSinkConnector extends SinkConnector {
 
-    public static final String FILE_CONFIG = "file";
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final String FILE_CONFIG = "file";
+    static final ConfigDef CONFIG_DEF = new ConfigDef()

Review Comment:
   Reverted the public -> package-private for `FILE_CONFIG` but we need `CONFIG_DEF` to be at least package-private since it is now being used in the task class as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940596673


##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    this.props = props;

Review Comment:
   Yeah, I like the sound of that 👍
   
   One tiny change: I think "Starting file source connector reading from &lt;filename&gt;" would  be better than the wording I suggested earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r941371749


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -20,36 +20,35 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(FileStreamSourceConnector.class);
     public static final String TOPIC_CONFIG = "topic";
     public static final String FILE_CONFIG = "file";
     public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
     public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to")
+        .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
         .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
                 "The maximum number of records the Source task can read from file one time");

Review Comment:
   ```suggestion
                   "The maximum number of records the source task can read from the file each time it is polled");
   ```



##########
docs/connect.html:
##########
@@ -423,9 +422,13 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    // All initialization logic and setting up of resources goes in this method. The FileStreamSourceConnector, however, doesn't need such logic here.

Review Comment:
   ```suggestion
       // Initialization logic and setting up resources can take place in this method. This connector doesn't need to do any of that, but we do log a helpful message to the user.
   ```



##########
docs/connect.html:
##########
@@ -440,19 +443,17 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public List&lt;Map&lt;String, String&gt;&gt; taskConfigs(int maxTasks) {
+    // This method is where connectors provide the task configs for the tasks that are to be created for this connector.
+    // The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is
+    // only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks).
+    // Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance,
+    // if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream).

Review Comment:
   This is really verbose. Can we simplify? I was hoping we'd be able to spell things out here in 1-2 lines.
   
   Keep in mind that the next paragraph provides a lot of useful info already:
   > Even with multiple tasks, this method implementation is usually pretty simple. It just has to determine the number of input tasks, which may require contacting the remote service it is pulling data from, and then divvy them up. Because some patterns for splitting work among tasks are so common, some utilities are provided in ConnectorUtils to simplify these cases.



##########
docs/connect.html:
##########
@@ -609,9 +618,11 @@ <h4><a id="connect_configs" href="#connect_configs">Connect Configuration Valida
     <p>The following code in <code>FileStreamSourceConnector</code> defines the configuration and exposes it to the framework.</p>
 
     <pre class="brush: java;">
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-    .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
-    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+    .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
+    .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")
+    .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW,
+        "The maximum number of records the Source task can read from file one time");

Review Comment:
   Nit (I know this was copied from an existing line but it should still be fixed):
   ```suggestion
           "The maximum number of records the source task can read from the file each time it is polled");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940598535


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Yeah, we should revert. They may be useful in integration tests and while they're not strictly public API, I don't see a strong argument for forcing people to use reflection to access now-private fields that haven't caused any trouble while they've been public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12450:
URL: https://github.com/apache/kafka/pull/12450#issuecomment-1198659548

   Hi Yash! Thanks for the fix. It's been a long time coming that we patched the KAFKA-9228 gap with the file connectors. I'm wondering about some of the other changes, though.
   
   We use these connectors as examples for developers on our docs site, and the section at https://kafka.apache.org/32/documentation.html#connect_connectorexample includes several code snippets taken directly from them. Even though it's not strictly necessary to pull out the topic, filename, etc. in the `Connector` implementations and we get the same behavior (KAFKA-9228 notwithstanding) by passing the original properties through to tasks transparently, it does help illustrate how to implement a connector to unfamiliar developers.
   
   Could you take a look at the docs, and see if there's a way to address KAFKA-9228 (and, if you want, clean up the style of the connectors) and, at the same time, maintain the connectors' value as examples for developers? Feel free to tweak the docs in `docs/connect.html` if necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r940603023


##########
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java:
##########
@@ -74,16 +73,25 @@ public void testSinkTasks() {
 
     @Test
     public void testSinkTasksStdout() {
-        sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        sinkProperties.remove(FileStreamSinkConnector.FILE_CONFIG);
         connector.start(sinkProperties);
         List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
         assertEquals(1, taskConfigs.size());
-        assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+        assertNull(taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
     }
 
     @Test
     public void testTaskClass() {
         connector.start(sinkProperties);
         assertEquals(FileStreamSinkTask.class, connector.taskClass());
     }
+
+    @Test
+    public void testConnectorConfigsPropagateToTaskConfigs() {

Review Comment:
   Might be worth adding a comment here (and in the `start` implementations for each connector) on why we do this and/or containing a reference to the Jira ticket for KAFKA-13809?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r933937606


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used")
-        .define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to")
+        .define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to")

Review Comment:
   Not sure why this was defined as a `LIST` config when config values with more than one topic were anyway being rejected. I've updated this to be a `STRING` config and this should be backward compatible for config values that were valid previously.



##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -77,12 +65,7 @@ public Class<? extends Task> taskClass() {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         ArrayList<Map<String, String>> configs = new ArrayList<>();
         // Only one input stream makes sense.
-        Map<String, String> config = new HashMap<>();
-        if (filename != null)
-            config.put(FILE_CONFIG, filename);
-        config.put(TOPIC_CONFIG, topic);
-        config.put(TASK_BATCH_SIZE_CONFIG, String.valueOf(batchSize));

Review Comment:
   Removed the redundant parsing from `String` to `int` (in `start()`) and then back to `String` here; it's cleaner to use the helper methods from `AbstractConfig` directly in the task class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on PR #12450:
URL: https://github.com/apache/kafka/pull/12450#issuecomment-1200363697

   Hi Chris! Thanks for taking a look, and also for pointing out that the docs need to be updated - I'd completely missed that code snippets from these FileStream connectors were being used there. I've made some changes, could you PTAL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r938000827


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   I didn't think we'd ever want to use them outside of the `org.apache.kafka.connect.file` package (even accidentally), so reduced the visibility. I can revert it if you want. 



##########
docs/connect.html:
##########
@@ -403,12 +403,11 @@ <h4><a id="connect_developing" href="#connect_developing">Developing a Simple Co
 
     <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector Example</a></h5>
 
-    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):</p>
+    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size):</p>
 
     <pre class="brush: java;">
 public class FileStreamSourceConnector extends SourceConnector {
-    private String filename;
-    private String topic;</pre>
+    private Map&lt;String, String&gt;;</pre>

Review Comment:
   Whoops, thanks



##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    this.props = props;

Review Comment:
   What about having the log line but also putting a comment (only in the docs) that explains that any initialization logic and setting up of resources typically goes in `start` and that none is required for the `FileStream` connector?
   
   And one more in `taskConfigs` that explains that the connector can generate task configs there, and that they could also have configs additional to / different from the connector configs if different tasks have different responsibilities (also with the same disclaimer that this isn't required for the `FileStream` connector)?



##########
docs/connect.html:
##########
@@ -466,23 +460,26 @@ <h5><a id="connect_taskexample" href="#connect_taskexample">Task Example - Sourc
 
     <pre class="brush: java;">
 public class FileStreamSourceTask extends SourceTask {
-    String filename;
-    InputStream stream;
-    String topic;
+    private String filename;
+    private InputStream stream;
+    private String topic;
+    private int batchSize;
 
     @Override
     public void start(Map&lt;String, String&gt; props) {
-        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);

Review Comment:
   Sure, makes sense, reverted this. 



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();
-    if (filename != null)
-        config.put(FILE_CONFIG, filename);
-    config.put(TOPIC_CONFIG, topic);
-    configs.add(config);
+    configs.add(props);
     return configs;
 }</pre>
 

Review Comment:
   Makes sense, it looks quite odd and out of place right now.



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();

Review Comment:
   Thanks, my bad for missing it earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org