You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/16 05:21:07 UTC

[kafka] branch 2.6 updated: MINOR: Documentation for KIP-585 (#8839)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new ffa7008  MINOR: Documentation for KIP-585 (#8839)
ffa7008 is described below

commit ffa70080b103ec7531c0352e734454d0483b9945
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Tue Jun 16 06:07:01 2020 +0100

    MINOR: Documentation for KIP-585 (#8839)
    
    * Add documentation for using transformation predicates.
    * Add `PredicateDoc` for generating predicate config docs, following the style of `TransformationDoc`.
    * Fix the header depth mismatch.
    * Avoid generating HTML ids based purely on the config name since there
    are very likely to conflict (e.g. #name). Instead allow passing a function
    which can be used to generate an id from a config key.
    
    The docs have been generated and tested locally.
    
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 build.gradle                                       |  8 ++
 .../org/apache/kafka/common/config/ConfigDef.java  | 34 +++++++--
 .../apache/kafka/connect/tools/PredicateDoc.java   | 85 ++++++++++++++++++++++
 .../kafka/connect/tools/TransformationDoc.java     |  6 +-
 .../transforms/predicates/HasHeaderKey.java        |  1 +
 .../transforms/predicates/RecordIsTombstone.java   |  3 +-
 .../transforms/predicates/TopicNameMatches.java    |  3 +
 docs/connect.html                                  | 69 ++++++++++++++++++
 8 files changed, 200 insertions(+), 9 deletions(-)

diff --git a/build.gradle b/build.gradle
index 65f18e5..38ae8ef 100644
--- a/build.gradle
+++ b/build.gradle
@@ -849,6 +849,7 @@ project(':core') {
                                'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs',
                                'genKafkaConfigDocs', 'genTopicConfigDocs',
                                ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
+                               ':connect:runtime:genConnectPredicateDocs',
                                ':connect:runtime:genSinkConnectorConfigDocs', ':connect:runtime:genSourceConnectorConfigDocs',
                                ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs',
                                ':connect:runtime:genConnectMetricsDocs'], type: Tar) {
@@ -1769,6 +1770,13 @@ project(':connect:runtime') {
     standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
   }
 
+  task genConnectPredicateDocs(type: JavaExec) {
+    classpath = sourceSets.main.runtimeClasspath
+    main = 'org.apache.kafka.connect.tools.PredicateDoc'
+    if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+    standardOutput = new File(generatedDocsDir, "connect_predicates.html").newOutputStream()
+  }
+
   task genConnectMetricsDocs(type: JavaExec) {
     classpath = sourceSets.test.runtimeClasspath
     main = 'org.apache.kafka.connect.runtime.ConnectMetrics'
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index d33a3c4..1e51758 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.config;
 
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.utils.Utils;
@@ -1489,7 +1490,16 @@ public class ConfigDef {
     }
 
     public String toHtml() {
-        return toHtml(Collections.<String, String>emptyMap());
+        return toHtml(Collections.emptyMap());
+    }
+
+    /**
+     * Converts this config into an HTML list that can be embedded into docs.
+     * @param headerDepth The top level header depth in the generated HTML.
+     * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name.
+     */
+    public String toHtml(int headerDepth, Function<String, String> idGenerator) {
+        return toHtml(headerDepth, idGenerator, Collections.emptyMap());
     }
 
     /**
@@ -1497,9 +1507,23 @@ public class ConfigDef {
      * If <code>dynamicUpdateModes</code> is non-empty, a "Dynamic Update Mode" label
      * will be included in the config details with the value of the update mode. Default
      * mode is "read-only".
-     * @param dynamicUpdateModes Config name -&gt; update mode mapping
+     * @param dynamicUpdateModes Config name -&gt; update mode mapping.
      */
     public String toHtml(Map<String, String> dynamicUpdateModes) {
+        return toHtml(4, Function.identity(), dynamicUpdateModes);
+    }
+
+    /**
+     * Converts this config into an HTML list that can be embedded into docs.
+     * If <code>dynamicUpdateModes</code> is non-empty, a "Dynamic Update Mode" label
+     * will be included in the config details with the value of the update mode. Default
+     * mode is "read-only".
+     * @param headerDepth The top level header depth in the generated HTML.
+     * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name.
+     * @param dynamicUpdateModes Config name -&gt; update mode mapping.
+     */
+    public String toHtml(int headerDepth, Function<String, String> idGenerator,
+                         Map<String, String> dynamicUpdateModes) {
         boolean hasUpdateModes = !dynamicUpdateModes.isEmpty();
         List<ConfigKey> configs = sortedConfigs();
         StringBuilder b = new StringBuilder();
@@ -1509,9 +1533,9 @@ public class ConfigDef {
                 continue;
             }
             b.append("<li>\n");
-            b.append(String.format("<h4>" +
-                    "<a id=\"%1$s\" href=\"#%1$s\">%1$s</a>" +
-                    "</h4>%n", key.name));
+            b.append(String.format("<h%1$d>" +
+                    "<a id=\"%2$s\" href=\"#%2$s\">%3$s</a>" +
+                    "</h%1$d>%n", headerDepth, idGenerator.apply(key.name), key.name));
             b.append("<p>");
             b.append(key.documentation.replaceAll("\n", "<br>"));
             b.append("</p>\n");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
new file mode 100644
index 0000000..d4399d6
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PredicateDoc {
+
+    private static final class DocInfo {
+        final String predicateName;
+        final String overview;
+        final ConfigDef configDef;
+
+        private <P extends Predicate<?>> DocInfo(Class<P> predicateClass, String overview, ConfigDef configDef) {
+            this.predicateName = predicateClass.getName();
+            this.overview = overview;
+            this.configDef = configDef;
+        }
+    }
+
+    private static final List<DocInfo> PREDICATES;
+    static {
+        List<DocInfo> collect = new Plugins(Collections.emptyMap()).predicates().stream()
+            .map(p -> {
+                try {
+                    String overviewDoc = (String) p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null);
+                    ConfigDef configDef = (ConfigDef) p.pluginClass().getDeclaredField("CONFIG_DEF").get(null);
+                    return new DocInfo(p.pluginClass(), overviewDoc, configDef);
+                } catch (ReflectiveOperationException e) {
+                    throw new RuntimeException("Predicate class " + p.pluginClass().getName() + " lacks either a `public static final String OVERVIEW_DOC` or `public static final ConfigDef CONFIG_DEF`");
+                }
+            })
+            .collect(Collectors.toList());
+        collect.sort(Comparator.comparing(docInfo -> docInfo.predicateName));
+        PREDICATES = collect;
+    }
+
+    private static void printPredicateHtml(PrintStream out, DocInfo docInfo) {
+        out.println("<div id=\"" + docInfo.predicateName + "\">");
+
+        out.print("<h5>");
+        out.print(docInfo.predicateName);
+        out.println("</h5>");
+
+        out.println(docInfo.overview);
+
+        out.println("<p/>");
+
+        out.println(docInfo.configDef.toHtml(6, key -> docInfo.predicateName + "_" + key));
+
+        out.println("</div>");
+    }
+
+    private static void printHtml(PrintStream out) {
+        for (final DocInfo docInfo : PREDICATES) {
+            printPredicateHtml(out, docInfo);
+        }
+    }
+
+    public static void main(String... args) {
+        printHtml(System.out);
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 3809d39..82c2663 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -76,18 +76,18 @@ public class TransformationDoc {
 
         out.println("<p/>");
 
-        out.println(docInfo.configDef.toHtml());
+        out.println(docInfo.configDef.toHtml(6, key -> docInfo.transformationName + "_"  + key));
 
         out.println("</div>");
     }
 
-    private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
+    private static void printHtml(PrintStream out) {
         for (final DocInfo docInfo : TRANSFORMATIONS) {
             printTransformationHtml(out, docInfo);
         }
     }
 
-    public static void main(String... args) throws Exception {
+    public static void main(String... args) {
         printHtml(System.out);
     }
 
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index 03f324e..f15d426 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
 public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
 
     private static final String NAME_CONFIG = "name";
+    public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name.";
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
             new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
index e39591c..4a21eac 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
@@ -27,7 +27,8 @@ import org.apache.kafka.connect.connector.ConnectRecord;
  */
 public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R> {
 
-    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+    public static final String OVERVIEW_DOC = "A predicate which is true for records which are tombstones (i.e. have null value).";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef();
 
     @Override
     public ConfigDef config() {
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
index fba8d51..3ea8f1a 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
@@ -33,6 +33,9 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
 public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
 
     private static final String PATTERN_CONFIG = "pattern";
+
+    public static final String OVERVIEW_DOC = "A predicate which is true for records with a topic name that matches the configured regular expression.";
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(PATTERN_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
             ConfigDef.CompositeValidator.of(new ConfigDef.NonEmptyString(), new RegexValidator()),
diff --git a/docs/connect.html b/docs/connect.html
index 99ec177..18ab5fb 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -167,6 +167,8 @@
     </pre>
 
     <p>You can see that the lines we've read are now part of a JSON map, and there is an extra field with the static value we specified. This is just one example of what you can do with transformations.</p>
+    
+    <h5><a id="connect_included_transformation" href="#connect_included_transformation">Included transformations</a></h5>
 
     <p>Several widely-applicable data and routing transformations are included with Kafka Connect:</p>
 
@@ -180,6 +182,7 @@
         <li>SetSchemaMetadata - modify the schema name or version</li>
         <li>TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps</li>
         <li>RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression</li>
+        <li>Filter - Removes messages from all further processing. This is used with a <a href="#connect_predicates">predicate</a> to selectively filter certain messages.</li>
     </ul>
 
     <p>Details on how to configure each transformation are listed below:</p>
@@ -187,6 +190,72 @@
 
     <!--#include virtual="generated/connect_transforms.html" -->
 
+
+    <h5><a id="connect_predicates" href="#connect_predicates">Predicates</a></h5>
+
+    <p>Transformations can be configured with predicates so that the transformation is applied only to messages which satisfy some condition. In particular, when combined with the <b>Filter</b> transformation predicates can be used to selectively filter out certain messages.</p>
+
+    <p>Predicates are specified in the connector configuration.</p>
+
+    <ul>
+        <li><code>predicates</code> - Set of aliases for the predicates to be applied to some of the transformations.</li>
+        <li><code>predicates.$alias.type</code> - Fully qualified class name for the predicate.</li>
+        <li><code>predicates.$alias.$predicateSpecificConfig</code> - Configuration properties for the predicate.</li>
+    </ul>
+
+    <p>All transformations have the implicit config properties <code>predicate</code> and <code>negate</code>. A predicular predicate is associated with a transformation by setting the transformation's <code>predicate</code> config to the predicate's alias. The predicate's value can be reversed using the <code>negate</code> configuration property.</p>
+
+    <p>For example, suppose you have a source connector which produces messages to many different topics and you want to:</p>
+    <ul>
+        <li>filter out the messages in the 'foo' topic entirely</li>
+        <li>apply the ExtractField transformation with the field name 'other_field' to records in all topics <i>except</i> the topic 'bar'</li>
+    </ul>
+
+    <p>To do this we need first to filter out the records destined for the topic 'foo'. The Filter transformation removes records from further processing, and can use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. TopicNameMatches's only configuration property is <code>pattern</code> which is a Java regular expression for matching against the topic name. The configuration would look like this:</p>
+
+    <pre class="brush: text;">
+        transforms=Filter
+        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+        transforms.Filter.predicate=IsFoo
+
+        predicates=IsFoo
+        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
+        predicates.IsFoo.pattern=foo
+    </pre>
+        
+    <p>Next we need to apply ExtractField only when the topic name of the record is not 'bar'. We can't just use TopicNameMatches directly, because that would apply the transformation to matching topic names, not topic names which do <i>not</i> match. The transformation's implicit <code>negate</code> config properties allows us to invert the set of records which a predicate matches. Adding the configuration for this to the previous example we arrive at:</p>
+
+    <pre class="brush: text;">
+        transforms=Filter,Extract
+        transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
+        transforms.Filter.predicate=IsFoo
+
+        transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
+        transforms.Extract.field=other_field
+        transforms.Extract.predicate=IsBar
+        transforms.Extract.negate=true
+
+        predicates=IsFoo,IsBar
+        predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
+        predicates.IsFoo.pattern=foo
+
+        predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches
+        predicates.IsBar.pattern=bar
+    </pre>
+
+    <p>Kafka Connect includes the following predicates:</p>
+
+    <ul>
+        <li><code>TopicNameMatches</code> - matches records in a topic with a name matching a particular Java regular expression.</li>
+        <li><code>HasHeaderKey</code> - matches records which have a header with the given key.</li>
+        <li><code>RecordIsTombstone</code> - matches tombstone records, that is records with a null value.</li>
+    </ul>
+
+    <p>Details on how to configure each predicate are listed below:</p>
+    
+    <!--#include virtual="generated/connect_predicates.html" -->
+
+
     <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
 
     <p>Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. The REST API server can be configured using the <code>listeners</code> configuration option.