You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/30 04:42:00 UTC

[jira] [Commented] (KAFKA-4772) Exploit #peek to implement #print() and other methods

    [ https://issues.apache.org/jira/browse/KAFKA-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344492#comment-16344492 ] 

ASF GitHub Bot commented on KAFKA-4772:
---------------------------------------

guozhangwang closed pull request #2669: KAFKA-4772: [WIP] Use peek to implement print
URL: https://github.com/apache/kafka/pull/2669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a11d8f443b8..ab451dc3fdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -325,6 +325,25 @@ void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
                final String streamName);
 
+    /**
+     * Print the records of this stream to {@code System.out}.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     *
+     * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
+     * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
+     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param mapper     mapper to allow customized output of key and value
+     */
+    void print(final Serde<K> keySerde,
+               final Serde<V> valSerde,
+               final String streamName,
+               final KeyValueMapper<K, V, String> mapper);
+
     /**
      * Write the records of this stream to a file at the given path.
      * This function will use the generated name of the parent processor node to label the key/value pairs printed to
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 79abbb558eb..92c32b8c14c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -190,11 +190,42 @@ public void print(Serde<K> keySerde, Serde<V> valSerde) {
 
     @Override
     public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) {
+        print(keySerde, valSerde, streamName, null);
+    }
+
+    @Override
+    public void print(final Serde<K> keySerde, final Serde<V> valSerde, String streamName, final KeyValueMapper<K, V, String> mapper){
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName), this.name);
+        topology.addProcessor(name, new KStreamPeek<>(printAction(System.out, keySerde, valSerde, streamName, mapper)), this.name);
     }
 
+    private static <K, V> ForeachAction<K, V> printAction(final PrintStream printStream, final Serde<?> keySerde, final Serde<?> valueSerde, final String streamName, final KeyValueMapper<K, V, String> mapper) {
+        return new ForeachAction<K, V>() {
+            @Override
+            public void apply(final K key, final V value) {
+                K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+                V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
+                if(mapper == null) {
+                    printStream.println("[" + streamName + "]: " + keyToPrint + " , " + valueToPrint);
+                } else {
+                    printStream.println("[" + streamName + "]: " + mapper.apply(keyToPrint, valueToPrint));
+                }
+            }
+
+            private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) {
+                if (receivedElement == null) {
+                    return null;
+                }
+
+                if (receivedElement instanceof byte[]) {
+                    return deserializer.deserialize("Topic", (byte[]) receivedElement);
+                }
+
+                return receivedElement;
+            }
+        };
+    }
 
     @Override
     public void writeAsText(String filePath) {
@@ -226,7 +257,7 @@ public void writeAsText(String filePath, String streamName, Serde<K> keySerde, S
         try {
 
             PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
-            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde, streamName), this.name);
+            topology.addProcessor(name, new KStreamPeek<>(printAction(printStream, keySerde, valSerde, streamName, null)), this.name);
 
         } catch (FileNotFoundException e) {
             String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Exploit #peek to implement #print() and other methods
> -----------------------------------------------------
>
>                 Key: KAFKA-4772
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4772
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: james chien
>            Priority: Minor
>              Labels: beginner, newbie
>             Fix For: 0.11.0.0
>
>
> From: https://github.com/apache/kafka/pull/2493#pullrequestreview-22157555
> Things that I can think of:
> - print / writeAsTest can be a special impl of peek; KStreamPrint etc can be removed.
> - consider collapse KStreamPeek with KStreamForeach with a flag parameter indicating if the acted key-value pair should still be forwarded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)