You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/01 21:57:57 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

gharris1727 opened a new pull request, #13184:
URL: https://github.com/apache/kafka/pull/13184

   Signed-off-by: Greg Harris <gr...@aiven.io>
   
   For several reasons, the PredicatedTransformation should not be a Transformation, see ticket for details.
   This refactor just naively wraps all transformations in predicated transformations without changing the name.
   We can change the name if the old one does not fit any more.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119388653


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +543,17 @@ private Converter badConverter() {
         return converter;
     }
 
+    private void mockSourceTransform() {
+        FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+        doReturn(FaultyPassthrough.class).when(transformationStage).transformClass();
+        when(transformationStage.apply(any())).thenAnswer(invocation -> faultyPassthrough.apply(invocation.getArgument(0)));
+    }

Review Comment:
   I added this in preparation for a later commit, but I agree it doesn't make much sense in the context of this PR. I'll push this out to that other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119389854


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   since there's a disagreement over this among reviewers, i'll leave this as it was on trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1103257875


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;

Review Comment:
   Unfortunately this is a case where the IDE linter is wrong, it won't compile without this cast:
   
   ```
   error: incompatible types: Class<FaultyPassthrough> cannot be converted to Class<? extends Transformation<?>>
           Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) FaultyPassthrough.class;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119281474


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate }
+ * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be unconditionally applied.
  * @param <R> The type of record (must be an implementation of {@link ConnectRecord})
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
-    final Predicate<R> predicate;
-    final Transformation<R> delegate;
-    final boolean negate;
+    private final Predicate<R> predicate;
+    private final Transformation<R> delegate;

Review Comment:
   Nit: this isn't a delegate anymore; maybe just `transform` or `transformation`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   I know IntelliJ loves to suggest "simplifying" boolean expressions by replacing uses of the ternary operator with `&&` but in cases like this I find the former significantly more readable.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate }

Review Comment:
   Nit: whitespace
   ```suggestion
    * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate}
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +543,17 @@ private Converter badConverter() {
         return converter;
     }
 
+    private void mockSourceTransform() {
+        FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+        doReturn(FaultyPassthrough.class).when(transformationStage).transformClass();
+        when(transformationStage.apply(any())).thenAnswer(invocation -> faultyPassthrough.apply(invocation.getArgument(0)));
+    }

Review Comment:
   Why are we introducing mocking for transformations in this test suite? I removed all the mocking and touched up the couple of lines that dealt with instantiating transformation chains in `createSinkTask` and `createSourceTask` and all the tests continued to pass.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -280,25 +283,18 @@ public void abstractPredicate() {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
-        assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
+        assertEquals(1, transformationStages.size());
+        TransformationStage<SinkRecord> stage = transformationStages.get(0);
 
-        assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
-        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+        assertEquals(expectedNegated ? 42 : 0, stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
 
-        predicated.close();
+        SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, null, null, null, 0L);
+        assertEquals(expectedNegated ? 84 : 42, stage.apply(matchingRecord).kafkaPartition().intValue());
 
-        assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
-        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);

Review Comment:
   Aren't we losing coverage here?
   
   Ah, it looks like this is already covered by `TransformationStageTest`. The change here should be safe.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which are specified in {@link #TRANSFORMS_CONFIG}.

Review Comment:
   This isn't strictly correct anymore; users don't specify `TransformationStage`s in connector configs, they specify `Transformation`s.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -280,25 +283,18 @@ public void abstractPredicate() {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);

Review Comment:
   We can keep most of the coverage provided by this line and add a test for the `transformClass` method with a single line here:
   
   ```java
   assertTrue(SimpleTransformation.class.isAssignableFrom(stage.transformClass()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13184:
URL: https://github.com/apache/kafka/pull/13184#issuecomment-1413610633

   To be honest, when I read this code a class which is "kind of" like a Transformation but throws because some methods are not implemented made more sense than wrapping Transformations in a (Maybe)PredicatedTransformation. As such, I am more in favour of the approach currently taken. @tombentley what is your opinion since you implemented this class?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13184:
URL: https://github.com/apache/kafka/pull/13184


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on PR #13184:
URL: https://github.com/apache/kafka/pull/13184#issuecomment-1422441303

   Side note - this is the first PR with a fully green build that I've seen in ages here 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119392537


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -280,25 +283,18 @@ public void abstractPredicate() {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
-        assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
+        assertEquals(1, transformationStages.size());
+        TransformationStage<SinkRecord> stage = transformationStages.get(0);
 
-        assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
-        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+        assertEquals(expectedNegated ? 42 : 0, stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
 
-        predicated.close();
+        SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, null, null, null, 0L);
+        assertEquals(expectedNegated ? 84 : 42, stage.apply(matchingRecord).kafkaPartition().intValue());
 
-        assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
-        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);

Review Comment:
   The value in the TestPredicate also affects the outward behavior in the other assertions here. We aren't asserting that the field is set, but we are asserting that the configuration affects the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13184:
URL: https://github.com/apache/kafka/pull/13184#issuecomment-1414001178

   @clolov Perhaps that's where a name change would help. If this was named `TransformationStage`, with a `TransformationChain` made up of multiple `TransformationStage`s and a `TransformationStage` is made up of an optional `Predicate` and required `Transformation`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1103295046


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   Yeah not related but it seems like a tiny change that we could include.
   If someone explicitly provided predicate = `""`, the behavior would be:
   1. this check passes
   2. the predicatePrefix would be `predicates..`
   3. It would look for `predicates..type` containing a classname
   4. It would configure the predicate with `predicates..*` prefixed configs.
   
   I think _technically_ an empty predicate name is functional, and could exist out there in the wild.
   It would be very silly, however.
   
   I think this means that it's more correct to change the default to `null`? It won't have a behavior change, but will prevent a bug if we ever used the parsed configs here.
   I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1103260370


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+        OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+        transformClass.thenReturn(value);

Review Comment:
   This also works around the type issues i was having, incredible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1104098263


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,54 +16,49 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate }
+ * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be unconditionally applied.
  * @param <R> The type of record (must be an implementation of {@link ConnectRecord})
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
-    final Predicate<R> predicate;
-    final Transformation<R> delegate;
-    final boolean negate;
+    private final Predicate<R> predicate;
+    private final Transformation<R> delegate;
+    private final boolean negate;
 
-    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+    TransformationStage(Transformation<R> delegate) {
+        this(null, false, delegate);
+    }
+
+    TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
         this.predicate = predicate;
         this.negate = negate;
         this.delegate = delegate;
     }
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " +
-                "should never be called directly.");
+    public Class<? extends Transformation<R>> transformClass() {
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) delegate.getClass();
+        return transformClass;
     }
 
-    @Override
     public R apply(R record) {
-        if (negate ^ predicate.test(record)) {
+        if (predicate == null || negate ^ predicate.test(record)) {
             return delegate.apply(record);
         }
         return record;
     }
 
-    @Override
-    public ConfigDef config() {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
-                "should never be called directly.");
-    }
-
     @Override
     public void close() {
         Utils.closeQuietly(delegate, "predicated transformation");

Review Comment:
   ```suggestion
           Utils.closeQuietly(delegate, "transformation");
   ```
   nit: Since this may or may not be a predicated transformation now.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   Thanks, the analysis makes sense and I agree that `null` seems like an appropriate default value here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
+        final List<TransformationStage<SinkRecord>> transformations = config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final TransformationStage<SinkRecord> xform = transformations.get(0);
+        assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue());

Review Comment:
   Sounds good, I didn't really have any concerns with these changes; just curious about why they were being made.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;

Review Comment:
   Huh, interesting, TIL.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -33,6 +34,7 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;

Review Comment:
   This unused import is causing a checkstyle failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1103287942


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
+        final List<TransformationStage<SinkRecord>> transformations = config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final TransformationStage<SinkRecord> xform = transformations.get(0);
+        assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue());

Review Comment:
   Both this refactor and a later refactor that I have planned are complicated by this sort of testing that depends on the _implementation_ of these classes (downcasting and field access). The new style depends on the _behavior_ of the configured instances, and works if the implementation details of the intermediate classes changes.
   
   While it would be possible to preserve the original style by accessing the package-local PredicatedTransform delegate field, that is not possible for the next refactor I have planned. Rather than revisiting this test later, I figured I would fix it once and avoid the potential merge conflicts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1099951075


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks like this check currently works fine because we're using the original configs and not the parsed configs. I wonder if we should update the default value and / or this check here to be consistent?
   
   Edit: This isn't really related to your change; just thought I'd bring it up since we were in the area.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1099898576


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   ```suggestion
                       transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));
   ```
   
   nit: negate condition can be simplified



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks like this check currently works fine because we're using the original configs and not the parsed configs. I wonder if we should update the default value and / or this check here to be consistent?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
+        final List<TransformationStage<SinkRecord>> transformations = config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final TransformationStage<SinkRecord> xform = transformations.get(0);
+        assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue());

Review Comment:
   Looks like these tests were setup to verify that the `ConnectorConfig::transformations` method works as expected; any reason why we're modifying them to verify the `SimpleTransformation` functionality (and hence only indirectly testing `ConnectorConfig::transformations`)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java:
##########
@@ -39,17 +44,21 @@ public void apply() {
     private void applyAndAssert(boolean predicateResult, boolean negate,
                                 SourceRecord expectedResult) {
 
-        SamplePredicate predicate = new SamplePredicate(predicateResult);
-        SampleTransformation<SourceRecord> predicatedTransform = new SampleTransformation<>(transformed);
-        PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>(
+        @SuppressWarnings("unchecked")
+        Predicate<SourceRecord> predicate = mock(Predicate.class);
+        when(predicate.test(any())).thenReturn(predicateResult);
+        @SuppressWarnings("unchecked")
+        Transformation<SourceRecord> predicatedTransform = mock(Transformation.class);
+        when(predicatedTransform.apply(any())).thenReturn(transformed);
+        TransformationStage<SourceRecord> pt = new TransformationStage<>(

Review Comment:
   ```suggestion
           TransformationStage<SourceRecord> transformationStage = new TransformationStage<>(
   ```
   nit



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +547,20 @@ private Converter badConverter() {
         return converter;
     }
 
+    private void mockSourceTransform() {
+        FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+        OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+        transformClass.thenReturn(value);

Review Comment:
   ```suggestion
           doReturn(FaultyPassthrough.class).when(transformation).transformClass();
   ```
   Same suggestion as above



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+        OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+        transformClass.thenReturn(value);

Review Comment:
   ```suggestion
           doReturn(FaultyPassthrough.class).when(transformation).transformClass();
   ```
   
   This might be more readable, WDYT?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;

Review Comment:
   ```suggestion
           Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) FaultyPassthrough.class;
   ```
   
   nit: looks like a redundant type cast?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -122,6 +123,8 @@ public class ErrorHandlingTaskTest {
 
     @Mock
     Plugins plugins;
+    @Mock
+    TransformationStage<?> transformation;

Review Comment:
   ```suggestion
       TransformationStage<?> transformationStage;
   ```
   
   nit: since it's a little confusing to read `transformation.transformClass`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
  * {@link Predicate} is true (or false, according to {@code negate}).

Review Comment:
   We might want to update this Javadoc now that this class is used even when predicates aren't configured?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
  * {@link Predicate} is true (or false, according to {@code negate}).
  * @param <R>
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
     final Predicate<R> predicate;
     final Transformation<R> delegate;
     final boolean negate;
 
-    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+    TransformationStage(Transformation<R> delegate) {
+        this(null, false, delegate);
+    }
+
+    TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
         this.predicate = predicate;
         this.negate = negate;
         this.delegate = delegate;
     }
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " +
-                "should never be called directly.");
+    public Class<? extends Transformation<R>> transformClass() {
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) delegate.getClass();
+        return transformClass;
     }
 
-    @Override
     public R apply(R record) {
-        if (negate ^ predicate.test(record)) {
+        if (predicate == null || negate ^ predicate.test(record)) {
             return delegate.apply(record);
         }
         return record;
     }
 
-    @Override
-    public ConfigDef config() {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
-                "should never be called directly.");
-    }
-
     @Override
     public void close() {
         Utils.closeQuietly(delegate, "predicated");

Review Comment:
   ```suggestion
           Utils.closeQuietly(delegate, "transformation");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13184:
URL: https://github.com/apache/kafka/pull/13184#issuecomment-1415734118

   I see what you mean. Okay, I am happy with it. Could you just rename the variable names from transformation to transformationStage as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org