You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/04 15:34:47 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors

C0urante commented on code in PR #12450:
URL: https://github.com/apache/kafka/pull/12450#discussion_r936225081


##########
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##########
@@ -16,40 +16,35 @@
  */
 package org.apache.kafka.connect.file;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Very simple connector that works with the console. This connector supports both source and
- * sink modes via its 'mode' setting.
+ * Very simple source connector that works with stdin or a file.
  */
 public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-    public static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    public static final int DEFAULT_TASK_BATCH_SIZE = 2000;
+    static final String TOPIC_CONFIG = "topic";
+    static final String FILE_CONFIG = "file";
+    static final String TASK_BATCH_SIZE_CONFIG = "batch.size";
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+    static final int DEFAULT_TASK_BATCH_SIZE = 2000;

Review Comment:
   Why reduce the visibility of these?



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();

Review Comment:
   Can we remove this line now?



##########
docs/connect.html:
##########
@@ -466,23 +460,26 @@ <h5><a id="connect_taskexample" href="#connect_taskexample">Task Example - Sourc
 
     <pre class="brush: java;">
 public class FileStreamSourceTask extends SourceTask {
-    String filename;
-    InputStream stream;
-    String topic;
+    private String filename;
+    private InputStream stream;
+    private String topic;
+    private int batchSize;
 
     @Override
     public void start(Map&lt;String, String&gt; props) {
-        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);

Review Comment:
   I think it'd be acceptable to leave this section as-was since the addition of the `AbstractConfig` class and `CONFIG_DEF` field introduces a bit too much new info into the mix that's not directly related to teaching beginners about the `start`/`stop`/`poll` methods.
   
   We also call out that "We'll use pseudo-code to describe most of the implementation, but you can refer to the source code for the full example" for the source task section, so it's less important that we keep the code for the connector and the code in these examples strictly in-sync here.



##########
docs/connect.html:
##########
@@ -423,9 +422,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     <pre class="brush: java;">
 @Override
 public void start(Map&lt;String, String&gt; props) {
-    // The complete version includes error handling as well.
-    filename = props.get(FILE_CONFIG);
-    topic = props.get(TOPIC_CONFIG);
+    this.props = props;

Review Comment:
   This feels a bit too much like magic boilerplate now. I imagine that if I were reading these docs for the first time to get up to speed with the connector API, it wouldn't really help me understand what the purposes of the `start` and `taskConfigs` methods are.
   
   I've noticed recently that we call out in the source task section that the code snippets may differ from the actual source code for the task. I'd be tempted to leave this as-is and add a similar disclaimer for this section, except it'd run the risk that users might use the same style, which would leave their connectors susceptible to KAFKA-9228.
   
   Can you think of a way to keep the source code changes here (i.e., storing `props` instead of just `filename` and `topic`), but also give some idea to users about what they can/should be doing in `start`?
   
   Some rejected alternatives I've thought of include:
   - Validating that the file exists (bad practice, since in most cases this should either be done in a [custom config validator](https://kafka.apache.org/32/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html) or on task startup)
   - Waiting for the file to exist, then [requesting a task reconfiguration](https://kafka.apache.org/32/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()) (too complicated and relies on concepts that should be introduced later on)
   - Logging the config props (bad practice, can reveal secrets in plaintext, should never be done)
   
   The best I can think of is maybe just adding a log line saying "Starting file source connector that will read from &lt;filename&gt;", but that feels a little too contrived... any thoughts?



##########
docs/connect.html:
##########
@@ -443,10 +440,7 @@ <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector
     ArrayList&lt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
     // Only one input stream makes sense.
     Map&lt;String, String&gt; config = new HashMap&lt;&gt;();
-    if (filename != null)
-        config.put(FILE_CONFIG, filename);
-    config.put(TOPIC_CONFIG, topic);
-    configs.add(config);
+    configs.add(props);
     return configs;
 }</pre>
 

Review Comment:
   While we're here, can we move the paragraph below starting with "Although not used in this example, `SourceTask` also provides two APIs to commit offsets" into the source task section, after the paragraph starting with "Note that this implementation uses the normal Java `InputStream` interface"? It describes the behavior of the task instead of the connector, really should not be in this section.



##########
docs/connect.html:
##########
@@ -403,12 +403,11 @@ <h4><a id="connect_developing" href="#connect_developing">Developing a Simple Co
 
     <h5><a id="connect_connectorexample" href="#connect_connectorexample">Connector Example</a></h5>
 
-    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):</p>
+    <p>We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a field that will store the configuration information to be propagated to the task(s) (the topic to send data to, and optionally - the filename to read from and the maximum batch size):</p>
 
     <pre class="brush: java;">
 public class FileStreamSourceConnector extends SourceConnector {
-    private String filename;
-    private String topic;</pre>
+    private Map&lt;String, String&gt;;</pre>

Review Comment:
   Missing variable name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org