You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/04 22:50:17 UTC
[kafka] branch 2.6 updated: MINOR: Add explanation for disabling
forwarding from value transformers (#8771)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new cb310f1 MINOR: Add explanation for disabling forwarding from value transformers (#8771)
cb310f1 is described below
commit cb310f1e4350e98e56c7df9cb38d827eb6304667
Author: Antony Stubbs <an...@gmail.com>
AuthorDate: Thu Jun 4 23:42:13 2020 +0100
MINOR: Add explanation for disabling forwarding from value transformers (#8771)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../internals/ForwardingDisabledProcessorContext.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index ba39368..2b8043a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -40,6 +40,11 @@ import java.util.Objects;
public final class ForwardingDisabledProcessorContext implements ProcessorContext {
private final ProcessorContext delegate;
+ private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
+ + "as the framework must ensure the key is not changed (#forward allows changing the key on "
+ + "messages which are sent). Try another function, which doesn't allow the key to be changed "
+ + "(for example - #tranformValues).";
+
public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
@@ -102,24 +107,24 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
@Override
public <K, V> void forward(final K key, final V value) {
- throw new StreamsException("ProcessorContext#forward() not supported.");
+ throw new StreamsException(EXPLANATION);
}
@Override
public <K, V> void forward(final K key, final V value, final To to) {
- throw new StreamsException("ProcessorContext#forward() not supported.");
+ throw new StreamsException(EXPLANATION);
}
@Override
@Deprecated
public <K, V> void forward(final K key, final V value, final int childIndex) {
- throw new StreamsException("ProcessorContext#forward() not supported.");
+ throw new StreamsException(EXPLANATION);
}
@Override
@Deprecated
public <K, V> void forward(final K key, final V value, final String childName) {
- throw new StreamsException("ProcessorContext#forward() not supported.");
+ throw new StreamsException(EXPLANATION);
}
@Override