You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/27 21:02:23 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

C0urante commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119281474


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate }
+ * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be unconditionally applied.
  * @param <R> The type of record (must be an implementation of {@link ConnectRecord})
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
-    final Predicate<R> predicate;
-    final Transformation<R> delegate;
-    final boolean negate;
+    private final Predicate<R> predicate;
+    private final Transformation<R> delegate;

Review Comment:
   Nit: this isn't a delegate anymore; maybe just `transform` or `transformation`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   I know IntelliJ loves to suggest "simplifying" boolean expressions by replacing uses of the ternary operator with `&&` but in cases like this I find the former significantly more readable.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate }

Review Comment:
   Nit: whitespace
   ```suggestion
    * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate}
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +543,17 @@ private Converter badConverter() {
         return converter;
     }
 
+    private void mockSourceTransform() {
+        FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+        doReturn(FaultyPassthrough.class).when(transformationStage).transformClass();
+        when(transformationStage.apply(any())).thenAnswer(invocation -> faultyPassthrough.apply(invocation.getArgument(0)));
+    }

Review Comment:
   Why are we introducing mocking for transformations in this test suite? I removed all the mocking and touched up the couple of lines that dealt with instantiating transformation chains in `createSinkTask` and `createSourceTask` and all the tests continued to pass.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -280,25 +283,18 @@ public void abstractPredicate() {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
-        assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
+        assertEquals(1, transformationStages.size());
+        TransformationStage<SinkRecord> stage = transformationStages.get(0);
 
-        assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
-        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+        assertEquals(expectedNegated ? 42 : 0, stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
 
-        predicated.close();
+        SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, null, null, null, 0L);
+        assertEquals(expectedNegated ? 84 : 42, stage.apply(matchingRecord).kafkaPartition().intValue());
 
-        assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
-        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);

Review Comment:
   Aren't we losing coverage here?
   
   Ah, it looks like this is already covered by `TransformationStageTest`. The change here should be safe.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which are specified in {@link #TRANSFORMS_CONFIG}.

Review Comment:
   This isn't strictly correct anymore; users don't specify `TransformationStage`s in connector configs, they specify `Transformation`s.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -280,25 +283,18 @@ public void abstractPredicate() {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);

Review Comment:
   We can keep most of the coverage provided by this line and add a test for the `transformClass` method with a single line here:
   
   ```java
   assertTrue(SimpleTransformation.class.isAssignableFrom(stage.transformClass()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org