You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/02/08 11:21:27 UTC
[GitHub] [kafka] yashmayya commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation
yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1099898576
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
/**
* Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
*/
- public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+ public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
- final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+ final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try {
@SuppressWarnings("unchecked")
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
Map<String, Object> configs = originalsWithPrefix(prefix);
- Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
- Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+ Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+ Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
transformation.configure(configs);
if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
@SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix));
- transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
+ transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
Review Comment:
```suggestion
transformations.add(new TransformationStage<>(predicate, negate != null && Boolean.parseBoolean(negate.toString()), transformation));
```
nit: negate condition can be simplified
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
/**
* Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
*/
- public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+ public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
- final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+ final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try {
@SuppressWarnings("unchecked")
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
Map<String, Object> configs = originalsWithPrefix(prefix);
- Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
- Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+ Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
+ Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
transformation.configure(configs);
if (predicateAlias != null) {
Review Comment:
The default value for `PREDICATE_CONFIG` is `""` and not `null`; it looks like this check currently works fine because we're using the original configs and not the parsed configs. I wonder if we should update the default value and / or this check here to be consistent?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
- final List<Transformation<R>> transformations = config.transformations();
+ final List<TransformationStage<SinkRecord>> transformations = config.transformations();
assertEquals(1, transformations.size());
- final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
- assertEquals(42, xform.magicNumber);
+ final TransformationStage<SinkRecord> xform = transformations.get(0);
+ assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue());
Review Comment:
Looks like these tests were setup to verify that the `ConnectorConfig::transformations` method works as expected; any reason why we're modifying them to verify the `SimpleTransformation` functionality (and hence only indirectly testing `ConnectorConfig::transformations`)?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java:
##########
@@ -39,17 +44,21 @@ public void apply() {
private void applyAndAssert(boolean predicateResult, boolean negate,
SourceRecord expectedResult) {
- SamplePredicate predicate = new SamplePredicate(predicateResult);
- SampleTransformation<SourceRecord> predicatedTransform = new SampleTransformation<>(transformed);
- PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>(
+ @SuppressWarnings("unchecked")
+ Predicate<SourceRecord> predicate = mock(Predicate.class);
+ when(predicate.test(any())).thenReturn(predicateResult);
+ @SuppressWarnings("unchecked")
+ Transformation<SourceRecord> predicatedTransform = mock(Transformation.class);
+ when(predicatedTransform.apply(any())).thenReturn(transformed);
+ TransformationStage<SourceRecord> pt = new TransformationStage<>(
Review Comment:
```suggestion
TransformationStage<SourceRecord> transformationStage = new TransformationStage<>(
```
nit
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -531,8 +547,20 @@ private Converter badConverter() {
return converter;
}
+ private void mockSourceTransform() {
+ FaultyPassthrough<SourceRecord> faultyPassthrough = new FaultyPassthrough<>();
+ @SuppressWarnings("unchecked")
+ Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+ OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+ transformClass.thenReturn(value);
Review Comment:
```suggestion
doReturn(FaultyPassthrough.class).when(transformation).transformClass();
```
Same suggestion as above
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
}
}
+ private void mockSinkTransform() {
+ FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+ @SuppressWarnings("unchecked")
+ Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
+ OngoingStubbing<Class<? extends Transformation<?>>> transformClass = when(transformation.transformClass());
+ transformClass.thenReturn(value);
Review Comment:
```suggestion
doReturn(FaultyPassthrough.class).when(transformation).transformClass();
```
This might be more readable, WDYT?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
}
}
+ private void mockSinkTransform() {
+ FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>();
+ @SuppressWarnings("unchecked")
+ Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class;
Review Comment:
```suggestion
Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) FaultyPassthrough.class;
```
nit: looks like a redundant type cast?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -122,6 +123,8 @@ public class ErrorHandlingTaskTest {
@Mock
Plugins plugins;
+ @Mock
+ TransformationStage<?> transformation;
Review Comment:
```suggestion
TransformationStage<?> transformationStage;
```
nit: since it's a little confusing to read `transformation.transformClass`
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
* {@link Predicate} is true (or false, according to {@code negate}).
Review Comment:
We might want to update this Javadoc now that this class is used even when predicates aren't configured?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -30,40 +27,37 @@
* {@link Predicate} is true (or false, according to {@code negate}).
* @param <R>
*/
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
static final String PREDICATE_CONFIG = "predicate";
static final String NEGATE_CONFIG = "negate";
final Predicate<R> predicate;
final Transformation<R> delegate;
final boolean negate;
- PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
+ TransformationStage(Transformation<R> delegate) {
+ this(null, false, delegate);
+ }
+
+ TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
this.predicate = predicate;
this.negate = negate;
this.delegate = delegate;
}
- @Override
- public void configure(Map<String, ?> configs) {
- throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " +
- "should never be called directly.");
+ public Class<? extends Transformation<R>> transformClass() {
+ @SuppressWarnings("unchecked")
+ Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) delegate.getClass();
+ return transformClass;
}
- @Override
public R apply(R record) {
- if (negate ^ predicate.test(record)) {
+ if (predicate == null || negate ^ predicate.test(record)) {
return delegate.apply(record);
}
return record;
}
- @Override
- public ConfigDef config() {
- throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
- "should never be called directly.");
- }
-
@Override
public void close() {
Utils.closeQuietly(delegate, "predicated");
Review Comment:
```suggestion
Utils.closeQuietly(delegate, "transformation");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org