You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/22 18:18:38 UTC

[pulsar] branch master updated: Pulsar Client Tools: handle every data type with 'pulsar-client consume' (#10301)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 27156e4  Pulsar Client Tools: handle every data type with 'pulsar-client consume' (#10301)
27156e4 is described below

commit 27156e4866298c44e83724eeb10a4ee67317433c
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Apr 22 20:17:54 2021 +0200

    Pulsar Client Tools: handle every data type with 'pulsar-client consume' (#10301)
---
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 88 ++++++++++++++++------
 1 file changed, 66 insertions(+), 22 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0e4f418..b614be0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -35,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -48,8 +50,10 @@ import org.apache.commons.io.HexDump;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
@@ -164,15 +168,9 @@ public class CmdConsume {
             data = "null";
         } else if (value instanceof byte[]) {
             byte[] msgData = (byte[]) value;
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            if (!displayHex) {
-                data = new String(msgData);
-            } else {
-                HexDump.dump(msgData, 0, out, 0);
-                data = new String(out.toByteArray());
-            }
-        } else if (value instanceof GenericRecord) {
-            Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
+            data = interpretByteArray(displayHex, msgData);
+        } else if (value instanceof GenericObject) {
+            Map<String, Object> asMap = genericObjectToMap((GenericObject) value, displayHex);
             data = asMap.toString();
         } else if (value instanceof ByteBuffer) {
             data = new String(getBytes((ByteBuffer) value));
@@ -194,19 +192,65 @@ public class CmdConsume {
         return sb.toString();
     }
 
-    private static Map<String, Object> genericRecordToMap(GenericRecord value) {
-        return value.getFields()
-                .stream()
-                .collect(Collectors.toMap(Field::getName, f -> {
-                    Object fieldValue = value.getField(f);
-                    if (fieldValue instanceof GenericRecord) {
-                        return genericRecordToMap((GenericRecord) fieldValue);
-                    } else if (fieldValue == null) {
-                        return "null";
-                    } else {
-                        return fieldValue;
-                    }
-                }));
+    private static String interpretByteArray(boolean displayHex, byte[] msgData) throws IOException {
+        String data;
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        if (!displayHex) {
+            return new String(msgData);
+        } else {
+            HexDump.dump(msgData, 0, out, 0);
+            return  new String(out.toByteArray());
+        }
+    }
+
+    private static Map<String, Object> genericObjectToMap(GenericObject value, boolean displayHex) throws IOException {
+        switch (value.getSchemaType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF_NATIVE:
+                    return genericRecordToMap((GenericRecord) value, displayHex);
+            case KEY_VALUE:
+                    return keyValueToMap((KeyValue) value.getNativeObject(), displayHex);
+            default:
+                return primitiveValueToMap(value.getNativeObject(), displayHex);
+        }
+    }
+
+    private static Map<String, Object> keyValueToMap(KeyValue value, boolean displayHex) throws IOException {
+        if (value == null) {
+            return ImmutableMap.of("value", "NULL");
+        }
+        return ImmutableMap.of("key", primitiveValueToMap(value.getKey(), displayHex),
+                "value", primitiveValueToMap(value.getValue(), displayHex));
+    }
+
+    private static Map<String, Object> primitiveValueToMap(Object value, boolean displayHex) throws IOException {
+        if (value == null) {
+            return ImmutableMap.of("value", "NULL");
+        }
+        if (value instanceof GenericObject) {
+            return genericObjectToMap((GenericObject) value, displayHex);
+        }
+        if (value instanceof byte[]) {
+            value = interpretByteArray(displayHex, (byte[]) value);
+        }
+        return ImmutableMap.of("value", value.toString(), "type", value.getClass());
+    }
+
+    private static Map<String, Object> genericRecordToMap(GenericRecord value, boolean displayHex) throws IOException {
+        Map<String, Object> res = new HashMap<>();
+        for (Field f : value.getFields()) {
+            Object fieldValue = value.getField(f);
+            if (fieldValue instanceof GenericRecord) {
+                fieldValue = genericRecordToMap((GenericRecord) fieldValue, displayHex);
+            } else if (fieldValue == null) {
+                fieldValue =  "NULL";
+            } else if (fieldValue instanceof byte[]) {
+                fieldValue = interpretByteArray(displayHex, (byte[]) fieldValue);
+            }
+            res.put(f.getName(), fieldValue);
+        }
+        return res;
     }
 
     /**