You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/02/08 11:21:27 UTC

[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1099898576


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   ```suggestion
                       transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));
   ```
   
   nit: negate condition can be simplified



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks like this check currently works fine because we're using the original configs and not the parsed configs. I wonder if we should update the default value and / or this check here to be consistent?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
-        final List<Transformation<R>> transformations = config.transformations();
+        final List<TransformationStage<SinkRecord>> transformations = config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final TransformationStage<SinkRecord> xform = transformations.get(0);
+        assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue());

Review Comment:
   Looks like these tests were setup to verify that the `ConnectorConfig::transformations` method works as expected; any reason why we're modifying them to verify the `SimpleTransformation` functionality (and hence only indirectly testing `ConnectorConfig::transformations`)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java:
##########
@@ -39,17 +44,21 @@ public void apply() {
     private void applyAndAssert(boolean predicateResult, boolean negate,
                                 SourceRecord expectedResult) {
 
-        SamplePredicate predicate = new SamplePredicate(predicateResult);
-        SampleTransformation<SourceRecord> predicatedTransform = new SampleTransformation<>(transformed);
-        PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>(
+        @SuppressWarnings("unchecked")
+        Predicate<SourceRecord> predicate = mock(Predicate.class);
+        when(predicate.test(any())).thenReturn(predicateResult);
+        @SuppressWarnings("unchecked")
+        Transformation<SourceRecord> predicatedTransform = mock(Transformation.class);
+        when(predicatedTransform.apply(any())).thenReturn(transformed);
+        TransformationStage<SourceRecord> pt = new TransformationStage<>(

Review Comment:
   ```suggestion
           TransformationStage<SourceRecord> transformationStage = new TransformationStage<>(
   ```
   nit



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +547,20 @@ private Converter badConverter() {
         return converter;
     }
 
+    private void mockSourceTransform() {
+        FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+        OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+        transformClass.thenReturn(value);

Review Comment:
   ```suggestion
           doReturn(FaultyPassthrough.class).when(transformation).transformClass();
   ```
   Same suggestion as above



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+        OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+        transformClass.thenReturn(value);

Review Comment:
   ```suggestion
           doReturn(FaultyPassthrough.class).when(transformation).transformClass();
   ```
   
   This might be more readable, WDYT?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;

Review Comment:
   ```suggestion
           Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) FaultyPassthrough.class;
   ```
   
   nit: looks like a redundant type cast?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -122,6 +123,8 @@ public class ErrorHandlingTaskTest {
 
     @Mock
     Plugins plugins;
+    @Mock
+    TransformationStage<?> transformation;

Review Comment:
   ```suggestion
       TransformationStage<?> transformationStage;
   ```
   
   nit: since it's a little confusing to read `transformation.transformClass`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
  * {@link Predicate} is true (or false, according to {@code negate}).

Review Comment:
   We might want to update this Javadoc now that this class is used even when predicates aren't configured?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
  * {@link Predicate} is true (or false, according to {@code negate}).
  * @param <R>
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
     final Predicate<R> predicate;
     final Transformation<R> delegate;
     final boolean negate;
 
-    PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+    TransformationStage(Transformation<R> delegate) {
+        this(null, false, delegate);
+    }
+
+    TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
         this.predicate = predicate;
         this.negate = negate;
         this.delegate = delegate;
     }
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " +
-                "should never be called directly.");
+    public Class<? extends Transformation<R>> transformClass() {
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) delegate.getClass();
+        return transformClass;
     }
 
-    @Override
     public R apply(R record) {
-        if (negate ^ predicate.test(record)) {
+        if (predicate == null || negate ^ predicate.test(record)) {
             return delegate.apply(record);
         }
         return record;
     }
 
-    @Override
-    public ConfigDef config() {
-        throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
-                "should never be called directly.");
-    }
-
     @Override
     public void close() {
         Utils.closeQuietly(delegate, "predicated");

Review Comment:
   ```suggestion
           Utils.closeQuietly(delegate, "transformation");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org