You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/13 22:39:28 UTC

[GitHub] [kafka] jeqo opened a new pull request, #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

jeqo opened a new pull request, #12637:
URL: https://github.com/apache/kafka/pull/12637

   KIP-821 initial implementation.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975569257


##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+class FieldPathTest {
+    final static String[] EMPTY_PATH = new String[]{};
+
+    @Test void shouldBuildV1WithDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo.bar.baz"}, FieldPath.ofV1("foo.bar.baz").path());
+        assertArrayEquals(new String[] {"foo.`bar.baz`"}, FieldPath.ofV1("foo.`bar.baz`").path());
+    }
+
+    @Test void shouldBuildV2WithEmptyPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of("", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithNullPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of(null, FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithoutDots() {
+        assertArrayEquals(new String[] {"foobarbaz"}, FieldPath.of("foobarbaz", FieldSyntaxVersion.V2).path());
+    }
+    @Test void shouldBuildV2WithoutWrappingBackticks() {
+        assertArrayEquals(new String[] {"foo`bar`baz"}, FieldPath.of("foo`bar`baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDots() {
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, FieldPath.of("foo.bar.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo", "bar.baz"}, FieldPath.of("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, FieldPath.of("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+        assertArrayEquals(new String[] {"foo", "ba`r.baz"}, FieldPath.of("foo.`ba`r.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "ba`r", "baz"}, FieldPath.of("foo.ba`r.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndEscapeBackticks() {
+        assertArrayEquals(new String[] {"foo", "bar`.`baz"}, FieldPath.of("foo.`bar\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar\\`.`baz"}, FieldPath.of("foo.`bar\\\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithBackticksWrappingBackticks() {

Review Comment:
   Good catch. I will stick to the rules in this case, as there is no need for this special case by using backslash to escape backticks.
   Tests are updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167958328


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Map;
+
+/**
+ * Operations to update data values and schemas based on field paths.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public interface FieldPath {

Review Comment:
   True. My goal was trying to align the ops between this two implementations.
   I think full alignment is achievable, and we can only offer the FieldPath interface to users: https://github.com/jeqo/kafka/pull/4
   Let me know what you think. It has some nice side effects described on the internal PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1148760142


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.

Review Comment:
   ```suggestion
    * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1168023325


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }

Review Comment:
   I like it! Adapting the constructor to this approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167124072


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        // if no dots or wrapping backticks are used, then return path with single step
+        if (!pathText.contains(String.valueOf(DOT_CHAR))) {
+            return new String[] {pathText};
+        } else {
+            // prepare for tracking path steps
+            final List<String> steps = new ArrayList<>();
+            // avoid creating new string on changes
+            final StringBuilder s = new StringBuilder(pathText);
+
+            while (s.length() > 0) { // until path is traversed
+                // start processing backtick pair, if any
+                if (s.charAt(0) == BACKTICK_CHAR) {
+                    s.deleteCharAt(0);
+
+                    // find backtick closing pair
+                    int idx = 0;
+                    while (idx >= 0) {
+                        idx = s.indexOf(String.valueOf(BACKTICK_CHAR), idx);
+                        if (idx == -1) { // if not found, fail
+                            throw new IllegalArgumentException("Incomplete backtick pair at [...]`" + s);
+                        }
+                        // check that it is not escaped or wrapped in another backticks pair
+                        if (idx < s.length() - 1 // not wrapping the whole field path
+                                && (s.charAt(idx + 1) != DOT_CHAR // not wrapping
+                                || s.charAt(idx - 1) == BACKSLASH_CHAR)) { // ... or escaped
+                            idx++; // move index forward and keep searching
+                        } else { // it's the closing pair
+                            steps.add(escapeBackticks(s.substring(0, idx)));
+                            s.delete(0, idx + 2); // rm backtick and dot
+                            break;
+                        }
+                    }
+                } else { // process dots in path
+                    final int atDot = s.indexOf(String.valueOf(DOT_CHAR));
+                    if (atDot > 0) { // get path step and move forward
+                        steps.add(escapeBackticks(s.substring(0, atDot)));
+                        s.delete(0, atDot + 1);
+                    } else { // add all
+                        steps.add(escapeBackticks(s.toString()));
+                        s.delete(0, s.length());
+                    }
+                }
+            }
+
+            return steps.toArray(new String[0]);
+        }
+    }
+
+    /**
+     * Return field name with escaped backticks, if any.
+     *
+     * @param field potentially containing backticks
+     * @throws IllegalArgumentException when there are incomplete backtick pairs
+     */
+    private String escapeBackticks(String field) {

Review Comment:
   ~would `removeEscapedBackticks` work better?~
   Not sure about it neither. The purpose is to remove the backslash used to escape backticks. maybe `processEscapedBackticks` would be a better name?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1300486803


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersion.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Arrays;
+
+/**
+ * Defines semantics of field paths by versioning.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public enum FieldSyntaxVersion {
+    /**
+     * No support for nested fields. Only access attributes on the root data value.
+     * Backward compatibility before KIP-821.
+     */
+    V1("V1"),
+    /**
+     * Support for nested fields using dotted notation with backtick pairs to wrap field names that
+     * include dots.
+     * @since 3.x
+     */
+    V2("V2");
+
+    public static final String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    public static final String FIELD_SYNTAX_VERSION_DOC =
+            "Defines the version of the syntax to access fields. "
+                    + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+                    + "If set to `V2`, the syntax will support accessing nested elements. To access nested elements, "
+                    + "dotted notation is used. If dots are already included in the field name, then backtick pairs "
+                    + "can be used to wrap field names containing dots. "
+                    + "e.g. to access elements from a field in a struct/map named \"foo.bar\", "
+                    + "the following format can be used to access its elements: \"`foo.bar`.baz\".";
+
+    public static final String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = V1.name();
+    public static final ConfigDef.Validator FIELD_SYNTAX_VERSION_VALIDATOR = new Validator();
+
+    public static ConfigDef baseConfigDef() {

Review Comment:
   `ConfigDef` is public API, we can't make changes to it without a KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1300517797


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *

Review Comment:
   We removed caching, right?
   
   ```suggestion
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a

Review Comment:
   ```suggestion
    * A SingleFieldPath is composed of one or more field names, known as steps,
    * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+

Review Comment:
   Nit: unnecessary newlines (allows less code to fit on the screen, makes it a little harder to read)
   ```suggestion
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+
+            }
+        }
+        return steps.toArray(new String[0]);
+    }
+
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (path.length == 1) {
+            return schema.field(path[0]);
+        } else {
+            Schema current = schema;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.field(path[i]);
+                } else { // iterate
+                    current = current.field(path[i]).schema();

Review Comment:
   Won't this cause an NPE if the field doesn't exist in this schema?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;

Review Comment:
   The idea with [this suggestion](https://github.com/apache/kafka/pull/12637#discussion_r1148773643) was to simplify the `if` statement because there were multiple boolean conditions that we were evaluating.
   
   That strategy isn't as useful here because each condition is the only thing we evaluate in each corresponding `if` statement, and each condition is only used once.
   
   IMO it'd be more readable to just keep things simple in this method, e.g. `if (idx >= pathText.length() - 1)`.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * <p>If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+    // Invariants:
+    // - Tree values contain either a nested tree or a field path
+    // - A tree can contain paths that are a subset of other paths
+    //   (e.g. foo and foo.bar in V2 would be kept)
+    final Map<String, Object> pathTree;

Review Comment:
   It looks like we've implemented a [trie](https://en.wikipedia.org/wiki/Trie) here. Usually these types of data structures are implemented with recursive types. In this case, we might have a `boolean leaf` field (which is used to track whether the current field should be included in the transformation) and a `Map<String, MultiFieldPaths> children` field that contains all children that share the same prefix, and can be empty if the current node is a leaf node.
   
   This would be a bit cleaner if we could manage it. Do you think you could give it a shot?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward

Review Comment:
   Sorry, I screwed this part up--should be `>=`, not `>`. It's possible (though unlikely) that someone could give us something like `...` and right now that would cause an infinite loop while trying to parse the path.
   
   Also, to catch another similar edge case, we should add this check at the very end of the method, right before the return statement:
   
   ```java
   if (!pathText.isEmpty() && pathText.charAt(pathText.length() - 1) == DOT)
       steps.add("");
   ```
   
   I verified this with a new test case in the `SingleFieldPathTest` suite, in case it helps:
   ```java
   @Test
   public void testEmptyFieldNames() {
       assertArrayEquals(
               new String[] {"", "", ""},
               new SingleFieldPath("..", FieldSyntaxVersion.V2).path()
       );
       assertArrayEquals(
               new String[] {"foo", "", ""},
               new SingleFieldPath("foo..", FieldSyntaxVersion.V2).path()
       );
       assertArrayEquals(
               new String[] {"", "bar", ""},
               new SingleFieldPath(".bar.", FieldSyntaxVersion.V2).path()
       );
       assertArrayEquals(
               new String[] {"", "", "baz"},
               new SingleFieldPath("..baz", FieldSyntaxVersion.V2).path()
       );
   }
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+
+            }
+        }
+        return steps.toArray(new String[0]);
+    }
+
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (path.length == 1) {
+            return schema.field(path[0]);
+        } else {
+            Schema current = schema;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.field(path[i]);
+                } else { // iterate
+                    current = current.field(path[i]).schema();
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (path.length == 1) {
+            return struct.get(path[0]);
+        } else {
+            Struct current = struct;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.get(path[i]);
+                } else { // iterate
+                    current = current.getStruct(path[i]);

Review Comment:
   Took a stab at this; it's a little unclean but all the tests pass; LMKWYT
   
   ```java
   public Object valueFrom(Struct struct) {
       Struct current = struct;
       for (String pathSegment : Arrays.asList(path).subList(0, path.length - 1)) {
           // Check to see if the field actually exists
           if (current.schema().field(pathSegment) == null) {
               return null;
           }
           current = current.getStruct(pathSegment);
       }
       return current.get(path[path.length - 1]);
   }
   ```
   
   (It also feels like it might be a little more ergonomic if `path` were a `List` instead of an array, but that's not a blocker)



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+
+            }
+        }
+        return steps.toArray(new String[0]);
+    }
+
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (path.length == 1) {
+            return schema.field(path[0]);
+        } else {
+            Schema current = schema;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.field(path[i]);
+                } else { // iterate
+                    current = current.field(path[i]).schema();
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (path.length == 1) {
+            return struct.get(path[0]);
+        } else {
+            Struct current = struct;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.get(path[i]);
+                } else { // iterate
+                    current = current.getStruct(path[i]);

Review Comment:
   Won't this cause a `DataException` if the field isn't found in this struct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975536673


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at [...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not wrapped or escaped
+                                            idx++; // move index forward and keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick

Review Comment:
   Not really. I was trying to optimize for that scenario; though there is not really much to optimize for haha. I removed the concept to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167626084


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersion.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Arrays;
+
+/**
+ * Defines semantics of field paths by versioning.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public enum FieldSyntaxVersion {
+    /**
+     * No support for nested fields. Only access attributes on the root data value.
+     * Backward compatibility before KIP-821.
+     */
+    V1("V1"),
+    /**
+     * Support for nested fields using dotted notation with backtick pairs to wrap field names that
+     * include dots.
+     * @since 3.x
+     */
+    V2("V2");
+
+    public static final String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    public static final String FIELD_SYNTAX_VERSION_DOC =
+            "Defines the version of the syntax to access fields. "
+                    + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+                    + "If set to `V2`, the syntax will support accessing nested elements. To access nested elements, "
+                    + "dotted notation is used. If dots are already included in the field name, then backtick pairs "
+                    + "can be used to wrap field names containing dots. "
+                    + "e.g. to access elements from a field in a struct/map named \"foo.bar\", "
+                    + "the following format can be used to access its elements: \"`foo.bar`.baz\".";
+
+    public static final String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = V1.name();
+    public static final ConfigDef.Validator FIELD_SYNTAX_VERSION_VALIDATOR = new Validator();
+
+    public static ConfigDef baseConfigDef() {

Review Comment:
   Good catch. I see also `new ConfigDef(base)` may be cover this already, but that would assume existing transformers are creating ConfigDefs from scratch.
   What about adding this method to `ConfigDef` itself?
   I gave a try and added `ConfigDef addDefinition(ConfigDef toAdd)` to `ConfigDef`. Let me know what you think 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1283676589


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Map;
+
+/**
+ * Operations to update data values and schemas based on field paths.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public interface FieldPath {

Review Comment:
   Hmm... I think the issue with trying to unify around a single `FieldPath` (or `FieldPaths`) interface is that we lose a lot of the guarantees that the Java type system can provide us. For example, in the PR you linked, the implementation of `MultiFieldPaths::fieldFrom` doesn't really guarantee that the user provided a single field, it just operates on a single arbitrarily-selected field, even if the user provided several fields. And I'm not sure of any case where an SMT designed to operate on a single field at a time would prefer to use an internal API designed to operate on several fields instead of a single field (e.g., `FieldsFrom` instead of `FieldFrom`).
   
   Also, even though there's technically nothing preventing users from building against this internal API, it's still not public API and we should not encourage users to build against, e.g., the `FieldPath` interface since it's subject to change at any time, at which point plugins that depend on it would become incompatible with some versions of Kafka Connect.
   
   Ultimately, I don't see us gaining much from this interface right now, since the ways we use the `SingleFieldPath` and `MultiFieldPaths` differ enough (at least as far as the Java type system is concerned) that it's difficult to define a single shared interface for the two of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975537150


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at [...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not wrapped or escaped
+                                            idx++; // move index forward and keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick
+                                        steps.add(checkIncompleteBacktickPair(s.substring(0, idx)));
+                                        s.delete(0, s.length());
+                                        break;
+                                    }
+                                }
+                            } else { // process path dots
+                                final int atDot = s.indexOf(DOT);
+                                if (atDot > 0) { // get step and move forward
+                                    steps.add(checkIncompleteBacktickPair(s.substring(0, atDot)));
+                                    s.delete(0, atDot + 1);
+                                } else { // add all
+                                    steps.add(checkIncompleteBacktickPair(s.toString()));
+                                    s.delete(0, s.length());
+                                }
+                            }
+                        }
+
+                        this.path = steps.toArray(new String[0]);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String checkIncompleteBacktickPair(String field) {

Review Comment:
   Good call. I have renamed it to `escapeBackticks` and add docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1168015733


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently,
+ * instead of using {@see SingleFieldPath} individually.
+ * <p>
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@code FieldPath} instead.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>Tree values contain either a nested tree or a field path</ul>
+ *     <ul>A tree cannot contain paths that are a subset of other paths (e.g. foo and foo.bar in V2 should collide and fail)</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths implements FieldPath {
+
+    final Map<String, Object> pathTree;
+    final List<SingleFieldPath> paths;
+
+    MultiFieldPaths(List<SingleFieldPath> paths) {
+        this.paths = paths.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        pathTree = buildPathTree(this.paths, 0, new HashMap<>());
+    }
+
+    public static Builder newBuilder(FieldSyntaxVersion syntaxVersion) {
+        return new Builder(syntaxVersion);
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath path) {
+        return new MultiFieldPaths(Collections.singletonList(path));
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath... paths) {
+        return new MultiFieldPaths(Arrays.asList(paths));
+    }
+
+    public static MultiFieldPaths of(List<SingleFieldPath> paths) {
+        return new MultiFieldPaths(paths);
+    }
+
+    public static MultiFieldPaths of(Set<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    public static MultiFieldPaths of(List<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    Map<String, Object> buildPathTree(List<SingleFieldPath> paths, int stepIdx, Map<String, Object> pathTree) {
+        if (paths.size() == 1) { // optimize for paths with a single member
+            SingleFieldPath path = paths.get(0);
+            if (path != null) {
+                if (path.stepAt(stepIdx + 1) == null) { // if last path step
+                    pathTree.put(path.stepAt(stepIdx), path);
+                } else {
+                    pathTree.put(path.stepAt(stepIdx),
+                            buildPathTree(paths, stepIdx + 1, new HashMap<>()));
+                }
+            }
+        } else {
+            // group paths by prefix
+            final Map<String, List<SingleFieldPath>> groups = new HashMap<>();
+            for (SingleFieldPath path : paths) {
+                if (path != null) {
+                    String step = path.stepAt(stepIdx);
+                    if (step != null) {
+                        groups.computeIfPresent(step, (s, fieldPaths) -> {
+                            for (SingleFieldPath other : fieldPaths) {
+                                // avoid overlapping paths
+                                if (!path.equals(other)
+                                        && (other.stepAt(stepIdx + 1) == null
+                                        || path.stepAt(stepIdx + 1) == null)) {
+                                    throw new IllegalArgumentException(
+                                            "Path " + other + " and " + path + " are overlapping. "
+                                                    + "Paths need to point to leaf values");

Review Comment:
   oh haven't think about this one! I agree it's a valid use-case.
   This will require keeping "user" on the tree. 
   
   I have gave a try and use empty string to represent "user" on the tree:
   - "user"
     - ""
     - "email"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1168009443


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently,
+ * instead of using {@see SingleFieldPath} individually.
+ * <p>
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@code FieldPath} instead.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>Tree values contain either a nested tree or a field path</ul>
+ *     <ul>A tree cannot contain paths that are a subset of other paths (e.g. foo and foo.bar in V2 should collide and fail)</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths implements FieldPath {
+
+    final Map<String, Object> pathTree;
+    final List<SingleFieldPath> paths;
+
+    MultiFieldPaths(List<SingleFieldPath> paths) {
+        this.paths = paths.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        pathTree = buildPathTree(this.paths, 0, new HashMap<>());
+    }
+
+    public static Builder newBuilder(FieldSyntaxVersion syntaxVersion) {
+        return new Builder(syntaxVersion);
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath path) {
+        return new MultiFieldPaths(Collections.singletonList(path));
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath... paths) {
+        return new MultiFieldPaths(Arrays.asList(paths));
+    }
+
+    public static MultiFieldPaths of(List<SingleFieldPath> paths) {
+        return new MultiFieldPaths(paths);
+    }
+
+    public static MultiFieldPaths of(Set<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    public static MultiFieldPaths of(List<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    Map<String, Object> buildPathTree(List<SingleFieldPath> paths, int stepIdx, Map<String, Object> pathTree) {
+        if (paths.size() == 1) { // optimize for paths with a single member

Review Comment:
   Agree, similar to previous premature optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1300510546


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {

Review Comment:
   Think this is still unaddressed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975521554


##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to access fields. "
+        + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. o access nested elements, "
+        + "dotted notation is used. If dots are already included in the field name, then backtick pairs "
+        + "can be used to wrap field names containing dots. "
+        + "e.g. to access elements from a struct/map named \"foo.bar\", "
+        + "the following format can be used to access its elements: \"`foo.bar`.baz\".";
+
+    String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = "V1";

Review Comment:
   Agree, moving it to `FieldSyntaxVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1485251519

   I'm also wondering about the spec for V2 field syntax. The KIP states that "if the backticks are in a wrapping position (opening or closing a field name), then need to be escaped with backslash", but I think we might actually want something like "if a backtick is followed by a dot in the field name, the backtick should be escaped with a backslash".
   
   For example:
   
   - With a field name of <code>a.b</code>, the user would have to enclose the field in backticks to handle the dot in the name, so they would specify <code>`a.b`</code> in the connector config
   - With a field name of <code>`a.b</code>, there's no dot after the backtick in the name, so nothing special is necessary; the user can just add that backtick to the field name in the connector config and use <code>``a.b`</code>
   - Similarly, with a field name of <code>a.b`</code>, the user could just specify <code>`a.b``</code> in the connector config
   - But, with a field name of <code>a`.b</code>, the user would need to signal to us that the <code>`.</code> part of that name is not the end of the field name, so they would have to specify <code>`a\`.b`</code> in the connector config
   
   Does that work, or is the current logic/spec correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167636433


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        // if no dots or wrapping backticks are used, then return path with single step
+        if (!pathText.contains(String.valueOf(DOT_CHAR))) {
+            return new String[] {pathText};
+        } else {
+            // prepare for tracking path steps
+            final List<String> steps = new ArrayList<>();
+            // avoid creating new string on changes
+            final StringBuilder s = new StringBuilder(pathText);
+
+            while (s.length() > 0) { // until path is traversed
+                // start processing backtick pair, if any
+                if (s.charAt(0) == BACKTICK_CHAR) {
+                    s.deleteCharAt(0);
+
+                    // find backtick closing pair
+                    int idx = 0;
+                    while (idx >= 0) {
+                        idx = s.indexOf(String.valueOf(BACKTICK_CHAR), idx);
+                        if (idx == -1) { // if not found, fail
+                            throw new IllegalArgumentException("Incomplete backtick pair at [...]`" + s);
+                        }
+                        // check that it is not escaped or wrapped in another backticks pair
+                        if (idx < s.length() - 1 // not wrapping the whole field path
+                                && (s.charAt(idx + 1) != DOT_CHAR // not wrapping
+                                || s.charAt(idx - 1) == BACKSLASH_CHAR)) { // ... or escaped
+                            idx++; // move index forward and keep searching
+                        } else { // it's the closing pair
+                            steps.add(escapeBackticks(s.substring(0, idx)));
+                            s.delete(0, idx + 2); // rm backtick and dot
+                            break;
+                        }
+                    }
+                } else { // process dots in path
+                    final int atDot = s.indexOf(String.valueOf(DOT_CHAR));
+                    if (atDot > 0) { // get path step and move forward
+                        steps.add(escapeBackticks(s.substring(0, atDot)));
+                        s.delete(0, atDot + 1);
+                    } else { // add all
+                        steps.add(escapeBackticks(s.toString()));
+                        s.delete(0, s.length());
+                    }
+                }
+            }
+
+            return steps.toArray(new String[0]);
+        }
+    }
+
+    /**
+     * Return field name with escaped backticks, if any.
+     *
+     * @param field potentially containing backticks
+     * @throws IllegalArgumentException when there are incomplete backtick pairs
+     */
+    private String escapeBackticks(String field) {

Review Comment:
   Going for `processEscapedBackticks` let's check it on the next round.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures [kafka]

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1947312971

   @mimaison thanks for the reminder! I checked with @C0urante and decided to open a new PR to continue this review as the current one just got too long for anyone to grasp. 
   Opened a new one: https://github.com/apache/kafka/pull/15379 where we can continue the review -- will close this one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167632114


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently,
+ * instead of using {@see SingleFieldPath} individually.
+ * <p>
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@code FieldPath} instead.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>Tree values contain either a nested tree or a field path</ul>
+ *     <ul>A tree cannot contain paths that are a subset of other paths (e.g. foo and foo.bar in V2 should collide and fail)</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths implements FieldPath {
+
+    final Map<String, Object> pathTree;
+    final List<SingleFieldPath> paths;
+
+    MultiFieldPaths(List<SingleFieldPath> paths) {
+        this.paths = paths.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        pathTree = buildPathTree(this.paths, 0, new HashMap<>());
+    }
+
+    public static Builder newBuilder(FieldSyntaxVersion syntaxVersion) {
+        return new Builder(syntaxVersion);
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath path) {
+        return new MultiFieldPaths(Collections.singletonList(path));
+    }
+
+    public static MultiFieldPaths of(SingleFieldPath... paths) {
+        return new MultiFieldPaths(Arrays.asList(paths));
+    }
+
+    public static MultiFieldPaths of(List<SingleFieldPath> paths) {
+        return new MultiFieldPaths(paths);
+    }
+
+    public static MultiFieldPaths of(Set<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    public static MultiFieldPaths of(List<String> fields, FieldSyntaxVersion syntaxVersion) {
+        return new MultiFieldPaths(fields.stream()
+                .map(f -> SingleFieldPath.of(f, syntaxVersion))
+                .collect(Collectors.toList()));
+    }
+
+    Map<String, Object> buildPathTree(List<SingleFieldPath> paths, int stepIdx, Map<String, Object> pathTree) {
+        if (paths.size() == 1) { // optimize for paths with a single member
+            SingleFieldPath path = paths.get(0);
+            if (path != null) {
+                if (path.stepAt(stepIdx + 1) == null) { // if last path step
+                    pathTree.put(path.stepAt(stepIdx), path);
+                } else {
+                    pathTree.put(path.stepAt(stepIdx),
+                            buildPathTree(paths, stepIdx + 1, new HashMap<>()));
+                }
+            }
+        } else {
+            // group paths by prefix
+            final Map<String, List<SingleFieldPath>> groups = new HashMap<>();
+            for (SingleFieldPath path : paths) {
+                if (path != null) {
+                    String step = path.stepAt(stepIdx);
+                    if (step != null) {
+                        groups.computeIfPresent(step, (s, fieldPaths) -> {
+                            for (SingleFieldPath other : fieldPaths) {
+                                // avoid overlapping paths
+                                if (!path.equals(other)
+                                        && (other.stepAt(stepIdx + 1) == null
+                                        || path.stepAt(stepIdx + 1) == null)) {
+                                    throw new IllegalArgumentException(
+                                            "Path " + other + " and " + path + " are overlapping. "
+                                                    + "Paths need to point to leaf values");
+                                }
+                            }
+                            if (!fieldPaths.contains(path)) {
+                                fieldPaths.add(path);
+                            }
+                            return fieldPaths;
+                        });
+                        groups.computeIfAbsent(step, s -> {
+                            List<SingleFieldPath> fieldPaths = new ArrayList<>();
+                            fieldPaths.add(path);
+                            return fieldPaths;
+                        });
+                    }
+                }
+            }

Review Comment:
   I like this! I was trying to be cautious of users adding overlapping paths. But by using Sets we could conflate them and only use the children paths.
   Giving a try now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167635238


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));

Review Comment:
   Agree. At the beginning was thinking paths were going to be built at runtime, but as they are built at construction no much value. Removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1296374731


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Map;
+
+/**
+ * Operations to update data values and schemas based on field paths.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public interface FieldPath {

Review Comment:
   Thanks for sharing the reasoning. I agree with your proposal and will be removing the FieldPath interface 👍🏽 



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Map;
+
+/**
+ * Operations to update data values and schemas based on field paths.
+ * <p>
+ * See KIP-821.
+ *
+ * @see SingleFieldPath
+ * @see MultiFieldPaths
+ */
+public interface FieldPath {

Review Comment:
   Thanks for sharing the reasoning. I agree with your proposal and will be removing the FieldPath interface 👍🏽 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1510491725

   @C0urante thanks for such a generous review! I learnt a lot while applying suggestions.
   
   I think I covered most of them, and let a few open ones with comments to cover them. 
   
   About the last suggestion and refactor, would you mind checking its format? I think GitHub Markdown has messed up the comment and it's hard for me to understand the proposal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures [kafka]

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo closed pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures
URL: https://github.com/apache/kafka/pull/12637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r974127639


##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to access fields. "
+        + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. o access nested elements, "
+        + "dotted notation is used. If dots are already included in the field name, then backtick pairs "
+        + "can be used to wrap field names containing dots. "
+        + "e.g. to access elements from a struct/map named \"foo.bar\", "
+        + "the following format can be used to access its elements: \"`foo.bar`.baz\".";

Review Comment:
   I think this should be `from a field in a struct/map named ...`  instead?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##########
@@ -39,33 +41,35 @@
     private static final String FIELD_CONFIG = "field";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.")
+            .define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING, FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH, FIELD_SYNTAX_VERSION_DOC);

Review Comment:
   Could we add a validator here to ensure that `FIELD_SYNTAX_VERSION_CONFIG` belongs to `FieldSyntaxVersion.values()` (maybe case-insensitive match if desired)?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes

Review Comment:
   ```suggestion
                           final StringBuilder s = new StringBuilder(path); // avoid creating new string on changes
   ```
   
   nit: unnecessary newline



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {

Review Comment:
   Why do we need this check?



##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to access fields. "
+        + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. o access nested elements, "
+        + "dotted notation is used. If dots are already included in the field name, then backtick pairs "
+        + "can be used to wrap field names containing dots. "
+        + "e.g. to access elements from a struct/map named \"foo.bar\", "
+        + "the following format can be used to access its elements: \"`foo.bar`.baz\".";
+
+    String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = "V1";

Review Comment:
   I think this (and the above config too) can be moved to `FieldPath` or `FieldSyntaxVersion` in the `connect:transforms` subproject since they don't need to be added to the public interface? And then you can avoid using the string form "V1" directly here. 
   
   Or else, if the intention *is* to add it to the public interface and allow custom SMT builders to also use this new notation, then should we consider moving the `FieldPath` and related classes to the `connect:api` subproject?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be

Review Comment:
   ```suggestion
    * <li>If field names contain backticks at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at [...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not wrapped or escaped
+                                            idx++; // move index forward and keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick

Review Comment:
   Is a "global" backtick referring to a pair of backticks that wrap the whole field path? I didn't get why that requires special handling?



##########
connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java:
##########
@@ -24,20 +24,31 @@
 
 /**
  * Single message transformation for Kafka Connect record types.
- *
+ * <br/>
  * Connectors can be configured with transformations to make lightweight message-at-a-time modifications.
  */
 public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
 
+    String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
+    String FIELD_SYNTAX_VERSION_DOC = "Defines the version of the syntax to access fields. "
+        + "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map."
+        + "If set to `V2`, the syntax will support accessing nested elements. o access nested elements, "

Review Comment:
   ```suggestion
           + "If set to `V2`, the syntax will support accessing nested elements. To access nested elements, "
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }

Review Comment:
   nit: can be package-private if only visible for testing



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/FieldPath.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+/**
+ * Represents a path to a field within a structure within a Connect key/value (e.g. Struct or
+ * Map<String, Object>).
+ * <ul>
+ * <li>It follows a dotted notation to represent nested values.</li>
+ * <li>If field names contain dots, can be escaped by wrapping field names with backticks.</li>
+ * <li>If field names contain dots at wrapping positions (beginning or end of path, before or after dots), then backticks need to be
+ * escaped by backslash.</li>
+ * </ul>
+ * Paths are calculated once and cached for further access.
+ */
+public class FieldPath {
+
+    private static final String BACKTICK = "`";
+    private static final String DOT = ".";
+    public static final char BACKTICK_CHAR = '`';
+    public static final char DOT_CHAR = '.';
+    public static final char BACKSLASH_CHAR = '\\';
+
+    private static final Map<String, FieldPath> PATHS_CACHE = new HashMap<>();
+
+    private final String[] path;
+
+    public static FieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    public static FieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field field path expression
+     * @param version  field syntax version
+     */
+    public static FieldPath of(String field, FieldSyntaxVersion version) {
+        if (field == null || field.isEmpty() || version.equals(FieldSyntaxVersion.V1)) {
+            return new FieldPath(field, version);
+        } else {
+            if (PATHS_CACHE.containsKey(field)) {
+                return PATHS_CACHE.get(field);
+            } else {
+                final FieldPath fieldPath = new FieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    FieldPath(String path, FieldSyntaxVersion version) {
+        if (path == null || path.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {path};
+                    break;
+                case V2:
+                    // if no dots or wrapping backticks are used, then return path with single step
+                    if (!path.contains(DOT)
+                        && !(path.startsWith(BACKTICK) && path.endsWith(
+                        BACKTICK))) {
+                        this.path = new String[] {path};
+                    } else {
+                        // prepare for tracking path steps
+                        final List<String> steps = new ArrayList<>();
+                        final StringBuilder s = new StringBuilder(
+                            path); // avoid creating new string on changes
+
+                        while (s.length() > 0) { // until path is traverse
+                            // process backtick pair if any
+                            if (s.charAt(0) == BACKTICK_CHAR) {
+                                s.deleteCharAt(0);
+
+                                // find backtick pair
+                                int idx = 0;
+                                while (idx >= 0) {
+                                    idx = s.indexOf(BACKTICK, idx);
+                                    if (idx == -1) {
+                                        throw new IllegalArgumentException(
+                                            "Incomplete backtick pair at [...]`" + s);
+                                    }
+                                    if (idx != s.length() - 1) { // non-global backtick
+                                        if (s.charAt(idx + 1) != DOT_CHAR
+                                            || s.charAt(idx - 1)
+                                            == BACKSLASH_CHAR) { // not wrapped or escaped
+                                            idx++; // move index forward and keep searching
+                                        } else { // it's end pair
+                                            steps.add(
+                                                checkIncompleteBacktickPair(s.substring(0, idx)));
+                                            s.delete(0, idx + 2); // rm backtick and dot
+                                            break;
+                                        }
+                                    } else { // global backtick
+                                        steps.add(checkIncompleteBacktickPair(s.substring(0, idx)));
+                                        s.delete(0, s.length());
+                                        break;
+                                    }
+                                }
+                            } else { // process path dots
+                                final int atDot = s.indexOf(DOT);
+                                if (atDot > 0) { // get step and move forward
+                                    steps.add(checkIncompleteBacktickPair(s.substring(0, atDot)));
+                                    s.delete(0, atDot + 1);
+                                } else { // add all
+                                    steps.add(checkIncompleteBacktickPair(s.toString()));
+                                    s.delete(0, s.length());
+                                }
+                            }
+                        }
+
+                        this.path = steps.toArray(new String[0]);
+                    }
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String checkIncompleteBacktickPair(String field) {

Review Comment:
   Would it be possible to add a doc comment for this method? Also `checkIncompleteBacktickPair` seems like a misnomer for a method that is returning a String



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+class FieldPathTest {
+    final static String[] EMPTY_PATH = new String[]{};
+
+    @Test void shouldBuildV1WithDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo.bar.baz"}, FieldPath.ofV1("foo.bar.baz").path());
+        assertArrayEquals(new String[] {"foo.`bar.baz`"}, FieldPath.ofV1("foo.`bar.baz`").path());
+    }
+
+    @Test void shouldBuildV2WithEmptyPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of("", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithNullPath() {
+        assertArrayEquals(EMPTY_PATH, FieldPath.of(null, FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithoutDots() {
+        assertArrayEquals(new String[] {"foobarbaz"}, FieldPath.of("foobarbaz", FieldSyntaxVersion.V2).path());
+    }
+    @Test void shouldBuildV2WithoutWrappingBackticks() {
+        assertArrayEquals(new String[] {"foo`bar`baz"}, FieldPath.of("foo`bar`baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDots() {
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, FieldPath.of("foo.bar.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+        assertArrayEquals(new String[] {"foo", "bar.baz"}, FieldPath.of("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar", "baz"}, FieldPath.of("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+        assertArrayEquals(new String[] {"foo", "ba`r.baz"}, FieldPath.of("foo.`ba`r.baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "ba`r", "baz"}, FieldPath.of("foo.ba`r.baz", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2AndEscapeBackticks() {
+        assertArrayEquals(new String[] {"foo", "bar`.`baz"}, FieldPath.of("foo.`bar\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+        assertArrayEquals(new String[] {"foo", "bar\\`.`baz"}, FieldPath.of("foo.`bar\\\\`.\\`baz`", FieldSyntaxVersion.V2).path());
+    }
+
+    @Test void shouldBuildV2WithBackticksWrappingBackticks() {

Review Comment:
   The rules defined in the KIP don't seem to include this case (although there is an example in the subsequent section)? 
   
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures#KIP821:ConnectTransformssupportfornestedstructures-Rules



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1167124072


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        // if no dots or wrapping backticks are used, then return path with single step
+        if (!pathText.contains(String.valueOf(DOT_CHAR))) {
+            return new String[] {pathText};
+        } else {
+            // prepare for tracking path steps
+            final List<String> steps = new ArrayList<>();
+            // avoid creating new string on changes
+            final StringBuilder s = new StringBuilder(pathText);
+
+            while (s.length() > 0) { // until path is traversed
+                // start processing backtick pair, if any
+                if (s.charAt(0) == BACKTICK_CHAR) {
+                    s.deleteCharAt(0);
+
+                    // find backtick closing pair
+                    int idx = 0;
+                    while (idx >= 0) {
+                        idx = s.indexOf(String.valueOf(BACKTICK_CHAR), idx);
+                        if (idx == -1) { // if not found, fail
+                            throw new IllegalArgumentException("Incomplete backtick pair at [...]`" + s);
+                        }
+                        // check that it is not escaped or wrapped in another backticks pair
+                        if (idx < s.length() - 1 // not wrapping the whole field path
+                                && (s.charAt(idx + 1) != DOT_CHAR // not wrapping
+                                || s.charAt(idx - 1) == BACKSLASH_CHAR)) { // ... or escaped
+                            idx++; // move index forward and keep searching
+                        } else { // it's the closing pair
+                            steps.add(escapeBackticks(s.substring(0, idx)));
+                            s.delete(0, idx + 2); // rm backtick and dot
+                            break;
+                        }
+                    }
+                } else { // process dots in path
+                    final int atDot = s.indexOf(String.valueOf(DOT_CHAR));
+                    if (atDot > 0) { // get path step and move forward
+                        steps.add(escapeBackticks(s.substring(0, atDot)));
+                        s.delete(0, atDot + 1);
+                    } else { // add all
+                        steps.add(escapeBackticks(s.toString()));
+                        s.delete(0, s.length());
+                    }
+                }
+            }
+
+            return steps.toArray(new String[0]);
+        }
+    }
+
+    /**
+     * Return field name with escaped backticks, if any.
+     *
+     * @param field potentially containing backticks
+     * @throws IllegalArgumentException when there are incomplete backtick pairs
+     */
+    private String escapeBackticks(String field) {

Review Comment:
   would `removeEscapeBackticks` work better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1698145226

   Thanks, @C0urante!
   
   About [1], I agree with your proposal. It will make the proposed notation more intuitive and reduce the cognitive load of users. I have updated the KIP and adjusted the representation with your proposal.
   
   [1]: https://github.com/apache/kafka/pull/12637#issuecomment-1485251519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1485252002

   I'm also wondering about the spec for V2 field syntax. The KIP states that "if the backticks are in a wrapping position (opening or closing a field name), then need to be escaped with backslash", but I think we might actually want something like "if a backtick is followed by a dot in the field name, the backtick should be escaped with a backslash".
   
   For example:
   
   - With a field name of <code>a.b</code>, the user would have to enclose the field in backticks to handle the dot in the name, so they would specify <code>`a.b`</code> in the connector config
   - With a field name of <code>`a.b</code>, there's no dot after the backtick in the name, so nothing special is necessary; the user can just add that backtick to the field name in the connector config and use <code>``a.b`</code>
   - Similarly, with a field name of <code>a.b`</code>, the user could just specify <code>`a.b``</code> in the connector config
   - But, with a field name of <code>a`.b</code>, the user would need to signal to us that the <code>`.</code> part of that name is not the end of the field name, so they would have to specify <code>`a\`.b`</code> in the connector config
   
   Does that work, or is the current logic/spec correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1149375240


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ * <p>
+ * If the SMT requires accessing multiple fields on the same data object,
+ * use {@code FieldPaths} instead.
+ * <p>
+ * The field path semantics are defined by the syntax version {@code FieldSyntaxVersion}.
+ * <p>
+ * Paths are calculated once and cached for further access.
+ * <p>
+ * Invariants:
+ * <li>
+ *     <ul>A field path can contain one or more steps</ul>
+ * </li>
+ *
+ * See KIP-821.
+ *
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath implements FieldPath {
+
+    private static final char BACKTICK_CHAR = '`';
+    private static final char DOT_CHAR = '.';
+    private static final char BACKSLASH_CHAR = '\\';
+
+    private static final Cache<String, SingleFieldPath> PATHS_CACHE = new SynchronizedCache<>(new LRUCache<>(16));
+
+    private final String[] path;
+
+    static SingleFieldPath ofV1(String field) {
+        return of(field, FieldSyntaxVersion.V1);
+    }
+
+    static SingleFieldPath ofV2(String field) {
+        return of(field, FieldSyntaxVersion.V2);
+    }
+
+    /**
+     * If version is V2, then paths are cached for further access.
+     *
+     * @param field   field path expression
+     * @param version field syntax version
+     */
+    public static SingleFieldPath of(String field, FieldSyntaxVersion version) {
+        if ((field == null || field.isEmpty()) // empty path
+                || version.equals(FieldSyntaxVersion.V1)) { // or V1
+            return new SingleFieldPath(field, version);
+        } else { // use cache when V2
+            final SingleFieldPath found = PATHS_CACHE.get(field);
+            if (found != null) {
+                return found;
+            } else {
+                final SingleFieldPath fieldPath = new SingleFieldPath(field, version);
+                PATHS_CACHE.put(field, fieldPath);
+                return fieldPath;
+            }
+        }
+    }
+
+    SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        if (pathText == null || pathText.isEmpty()) { // empty path
+            this.path = new String[] {};
+        } else {
+            switch (version) {
+                case V1: // backward compatibility
+                    this.path = new String[] {pathText};
+                    break;
+                case V2:
+                    path = buildFieldPathV2(pathText);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown syntax version: " + version);
+            }
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        // if no dots or wrapping backticks are used, then return path with single step
+        if (!pathText.contains(String.valueOf(DOT_CHAR))) {
+            return new String[] {pathText};
+        } else {
+            // prepare for tracking path steps
+            final List<String> steps = new ArrayList<>();
+            // avoid creating new string on changes
+            final StringBuilder s = new StringBuilder(pathText);
+
+            while (s.length() > 0) { // until path is traversed
+                // start processing backtick pair, if any
+                if (s.charAt(0) == BACKTICK_CHAR) {
+                    s.deleteCharAt(0);
+
+                    // find backtick closing pair
+                    int idx = 0;
+                    while (idx >= 0) {
+                        idx = s.indexOf(String.valueOf(BACKTICK_CHAR), idx);
+                        if (idx == -1) { // if not found, fail
+                            throw new IllegalArgumentException("Incomplete backtick pair at [...]`" + s);
+                        }
+                        // check that it is not escaped or wrapped in another backticks pair
+                        if (idx < s.length() - 1 // not wrapping the whole field path
+                                && (s.charAt(idx + 1) != DOT_CHAR // not wrapping
+                                || s.charAt(idx - 1) == BACKSLASH_CHAR)) { // ... or escaped
+                            idx++; // move index forward and keep searching
+                        } else { // it's the closing pair
+                            steps.add(escapeBackticks(s.substring(0, idx)));
+                            s.delete(0, idx + 2); // rm backtick and dot
+                            break;
+                        }
+                    }
+                } else { // process dots in path
+                    final int atDot = s.indexOf(String.valueOf(DOT_CHAR));
+                    if (atDot > 0) { // get path step and move forward
+                        steps.add(escapeBackticks(s.substring(0, atDot)));
+                        s.delete(0, atDot + 1);
+                    } else { // add all
+                        steps.add(escapeBackticks(s.toString()));
+                        s.delete(0, s.length());
+                    }
+                }
+            }
+
+            return steps.toArray(new String[0]);
+        }
+    }
+
+    /**
+     * Return field name with escaped backticks, if any.
+     *
+     * @param field potentially containing backticks
+     * @throws IllegalArgumentException when there are incomplete backtick pairs
+     */
+    private String escapeBackticks(String field) {
+        final StringBuilder s = new StringBuilder(field);
+        int idx = 0;
+        while (idx >= 0) {
+            idx = s.indexOf(String.valueOf(BACKTICK_CHAR), idx + 1);
+            if (idx >= 1 && s.length() > 2) {
+                if (s.charAt(idx - 1) == DOT_CHAR
+                        || (idx < s.length() - 1 && s.charAt(idx + 1) == DOT_CHAR
+                        && s.charAt(idx - 1) != BACKSLASH_CHAR)) {
+                    throw new IllegalArgumentException("Incomplete backtick pair at [...]" + field);
+                }
+                if (s.charAt(idx - 1) == BACKSLASH_CHAR) { // escape backtick
+                    if ((idx == 1 && s.charAt(0) == BACKSLASH_CHAR) // at the beginning: \`foo[...]
+                            || idx == s.length() - 1) { // at the end: [...]baz\`
+                        s.deleteCharAt(idx - 1);
+                    } else if ((idx > 2 && s.charAt(idx - 2) == DOT_CHAR) // after a dot: [...].\`bar[...]
+                            || (idx < s.length() - 1 && s.charAt(idx + 1) == DOT_CHAR)) { // before a dot: [...]bar\`.[...]
+                        s.deleteCharAt(idx - 1);
+                    }
+                }
+            }
+        }
+        return s.toString();
+    }

Review Comment:
   I took a stab at simplifying this whole method to 1) remove unnecessary `StringBuilder` instantiation, 2) handle all backtick escape logic on the fly (instead of in a follow-up step with the `escapeBackticks` method), and 3) follow the slightly-tweaked spec I outlined in my latest comment. LMKWYT
   
   ```suggestion
       private String[] buildFieldPathV2(String pathText) {
           final List<String> steps = new ArrayList<>();
           int idx = 0;
           while (idx < pathText.length() && idx >= 0) {
               if (pathText.charAt(idx) != BACKTICK_CHAR) {
                   final int start = idx;
                   idx = pathText.indexOf(String.valueOf(DOT_CHAR), idx);
                   if (idx > 0) { // get path step and move forward
                       String field = pathText.substring(start, idx);
                       steps.add(field);
                       idx++;
                   } else { // add all
                       String field = pathText.substring(start);
                       steps.add(field);
                   }
               } else {
                   StringBuilder field = new StringBuilder();
                   idx++;
                   int start = idx;
                   while (true) {
                       idx = pathText.indexOf(String.valueOf(BACKTICK_CHAR), idx);
                       if (idx == -1) { // if not found, fail
                           throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
                       }
                       boolean endOfPath = idx >= pathText.length() - 1;
                       if (endOfPath) {
                           field.append(pathText, start, idx);
                           // we've reached the end of the path, and the last character is the backtick
                           steps.add(field.toString());
                           idx++;
                           break;
                       }
                       if (pathText.charAt(idx + 1) != DOT_CHAR) {
                           // this backtick isn't followed by a dot; include it in the field name, but continue
                           // looking for a matching backtick that is followed by a dot
                           idx++;
                           continue;
                       }
                       if (pathText.charAt(idx - 1) == BACKSLASH_CHAR) {
                           // this backtick was escaped; include it in the field name, but continue
                           // looking for an unescaped matching backtick
                           field.append(pathText, start, idx - 1)
                                   .append(BACKTICK_CHAR);
                           idx++;
                           start = idx;
                           continue;
                       }
                       // we've found our matching backtick
                       field.append(pathText, start, idx);
                       steps.add(field.toString());
                       idx += 2; // increment by two to include the backtick and the dot after it
                       break;
                   }
               }
           }
           return steps.toArray(new String[0]);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226: [connect:transform] Introduce support for nested structures

Posted by GitBox <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r975521064


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java:
##########
@@ -39,33 +41,35 @@
     private static final String FIELD_CONFIG = "field";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");
+            .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.")
+            .define(FIELD_SYNTAX_VERSION_CONFIG, ConfigDef.Type.STRING, FIELD_SYNTAX_VERSION_DEFAULT_VALUE, ConfigDef.Importance.HIGH, FIELD_SYNTAX_VERSION_DOC);

Review Comment:
   Great idea! adding validator and constants to `FieldSyntaxVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures [kafka]

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12637:
URL: https://github.com/apache/kafka/pull/12637#issuecomment-1920935032

   @jeqo Are you planning to complete this work? This would be a nice KIP to have.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1309356167


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward

Review Comment:
   Sure, applying this and adding the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1309356098


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * <p>If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+    // Invariants:
+    // - Tree values contain either a nested tree or a field path
+    // - A tree can contain paths that are a subset of other paths
+    //   (e.g. foo and foo.bar in V2 would be kept)
+    final Map<String, Object> pathTree;

Review Comment:
   Sure! Thanks for introducing this structure. I gave it a shot, please check the latest commits and let me know if it's close to what you had in mind.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+
+            }
+        }
+        return steps.toArray(new String[0]);
+    }
+
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (path.length == 1) {
+            return schema.field(path[0]);
+        } else {
+            Schema current = schema;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.field(path[i]);
+                } else { // iterate
+                    current = current.field(path[i]).schema();

Review Comment:
   Yeap, good catch. Adding null validations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeqo commented on a diff in pull request #12637: KAFKA-14226 (KIP-821): [connect:transform] Introduce support for nested structures

Posted by "jeqo (via GitHub)" <gi...@apache.org>.
jeqo commented on code in PR #12637:
URL: https://github.com/apache/kafka/pull/12637#discussion_r1309356364


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A FieldPath is composed by one or many field names, known as steps,
+ * to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).a
+ *
+ * <p>If the SMT requires accessing multiple fields on the same data object,
+ * use {@link MultiFieldPaths} instead.
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
+ *
+ * <p>Paths are calculated once and cached for further access.
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final String[] path;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        Objects.requireNonNull(pathText, "Field path cannot be null");
+        switch (version) {
+            case V1: // backward compatibility
+                this.path = new String[] {pathText};
+                break;
+            case V2:
+                this.path = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " + version);
+        }
+    }
+
+    private String[] buildFieldPathV2(String pathText) {
+        final List<String> steps = new ArrayList<>();
+        int idx = 0;
+        while (idx < pathText.length() && idx >= 0) {
+            if (pathText.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = pathText.indexOf(String.valueOf(DOT), idx);
+                if (idx > 0) { // get path step and move forward
+                    String field = pathText.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = pathText.substring(start);
+                    steps.add(field);
+                }
+            } else {
+                StringBuilder field = new StringBuilder();
+                idx++;
+                int start = idx;
+                while (true) {
+                    idx = pathText.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, fail
+                        throw new IllegalArgumentException("Incomplete backtick pair in path: " + pathText);
+                    }
+
+                    boolean atEndOfPath = idx >= pathText.length() - 1;
+                    if (atEndOfPath) {
+                        field.append(pathText, start, idx);
+                        // we've reached the end of the path, and the last character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    boolean notFollowedByDot = pathText.charAt(idx + 1) != DOT;
+                    if (notFollowedByDot) {
+                        boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                        if (afterABackslash) {
+                            // this backtick was escaped; include it in the field name, but continue
+                            // looking for an unescaped matching backtick
+                            field.append(pathText, start, idx - 1)
+                                .append(BACKTICK);
+
+                            idx++;
+                            start = idx;
+                        } else {
+                            // this backtick isn't followed by a dot; include it in the field name, but continue
+                            // looking for a matching backtick that is followed by a dot
+                            idx++;
+                        }
+                        continue;
+                    }
+
+                    boolean afterABackslash = pathText.charAt(idx - 1) == BACKSLASH;
+                    if (afterABackslash) {
+                        // this backtick was escaped; include it in the field name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(pathText, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+                    // we've found our matching backtick
+                    field.append(pathText, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and the dot after it
+                    break;
+                }
+
+
+            }
+        }
+        return steps.toArray(new String[0]);
+    }
+
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (path.length == 1) {
+            return schema.field(path[0]);
+        } else {
+            Schema current = schema;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.field(path[i]);
+                } else { // iterate
+                    current = current.field(path[i]).schema();
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (path.length == 1) {
+            return struct.get(path[0]);
+        } else {
+            Struct current = struct;
+            for (int i = 0; i < path.length; i++) {
+                if (current == null) {
+                    return null;
+                }
+                if (i == path.length - 1) { // get value
+                    return current.get(path[i]);
+                } else { // iterate
+                    current = current.getStruct(path[i]);

Review Comment:
   Similar to the trie suggestion, I think a list may be simpler than an array. I gave it a try. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org