You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/04 22:50:17 UTC

[kafka] branch 2.6 updated: MINOR: Add explanation for disabling forwarding from value transformers (#8771)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new cb310f1  MINOR: Add explanation for disabling forwarding from value transformers (#8771)
cb310f1 is described below

commit cb310f1e4350e98e56c7df9cb38d827eb6304667
Author: Antony Stubbs <an...@gmail.com>
AuthorDate: Thu Jun 4 23:42:13 2020 +0100

    MINOR: Add explanation for disabling forwarding from value transformers (#8771)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../internals/ForwardingDisabledProcessorContext.java       | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
index ba39368..2b8043a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -40,6 +40,11 @@ import java.util.Objects;
 public final class ForwardingDisabledProcessorContext implements ProcessorContext {
     private final ProcessorContext delegate;
 
+    private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
+        + "as the framework must ensure the key is not changed (#forward allows changing the key on "
+        + "messages which are sent). Try another function, which doesn't allow the key to be changed "
+        + "(for example - #tranformValues).";
+
     public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
         this.delegate = Objects.requireNonNull(delegate, "delegate");
     }
@@ -102,24 +107,24 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
 
     @Override
     public <K, V> void forward(final K key, final V value) {
-        throw new StreamsException("ProcessorContext#forward() not supported.");
+        throw new StreamsException(EXPLANATION);
     }
 
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        throw new StreamsException("ProcessorContext#forward() not supported.");
+        throw new StreamsException(EXPLANATION);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key, final V value, final int childIndex) {
-        throw new StreamsException("ProcessorContext#forward() not supported.");
+        throw new StreamsException(EXPLANATION);
     }
 
     @Override
     @Deprecated
     public <K, V> void forward(final K key, final V value, final String childName) {
-        throw new StreamsException("ProcessorContext#forward() not supported.");
+        throw new StreamsException(EXPLANATION);
     }
 
     @Override