You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/22 18:18:38 UTC
[pulsar] branch master updated: Pulsar Client Tools: handle every
data type with 'pulsar-client consume' (#10301)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 27156e4 Pulsar Client Tools: handle every data type with 'pulsar-client consume' (#10301)
27156e4 is described below
commit 27156e4866298c44e83724eeb10a4ee67317433c
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Apr 22 20:17:54 2021 +0200
Pulsar Client Tools: handle every data type with 'pulsar-client consume' (#10301)
---
.../org/apache/pulsar/client/cli/CmdConsume.java | 88 ++++++++++++++++------
1 file changed, 66 insertions(+), 22 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 0e4f418..b614be0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -23,6 +23,7 @@ import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -35,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -48,8 +50,10 @@ import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
@@ -164,15 +168,9 @@ public class CmdConsume {
data = "null";
} else if (value instanceof byte[]) {
byte[] msgData = (byte[]) value;
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- if (!displayHex) {
- data = new String(msgData);
- } else {
- HexDump.dump(msgData, 0, out, 0);
- data = new String(out.toByteArray());
- }
- } else if (value instanceof GenericRecord) {
- Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
+ data = interpretByteArray(displayHex, msgData);
+ } else if (value instanceof GenericObject) {
+ Map<String, Object> asMap = genericObjectToMap((GenericObject) value, displayHex);
data = asMap.toString();
} else if (value instanceof ByteBuffer) {
data = new String(getBytes((ByteBuffer) value));
@@ -194,19 +192,65 @@ public class CmdConsume {
return sb.toString();
}
- private static Map<String, Object> genericRecordToMap(GenericRecord value) {
- return value.getFields()
- .stream()
- .collect(Collectors.toMap(Field::getName, f -> {
- Object fieldValue = value.getField(f);
- if (fieldValue instanceof GenericRecord) {
- return genericRecordToMap((GenericRecord) fieldValue);
- } else if (fieldValue == null) {
- return "null";
- } else {
- return fieldValue;
- }
- }));
+ private static String interpretByteArray(boolean displayHex, byte[] msgData) throws IOException {
+ String data;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ if (!displayHex) {
+ return new String(msgData);
+ } else {
+ HexDump.dump(msgData, 0, out, 0);
+ return new String(out.toByteArray());
+ }
+ }
+
+ private static Map<String, Object> genericObjectToMap(GenericObject value, boolean displayHex) throws IOException {
+ switch (value.getSchemaType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ return genericRecordToMap((GenericRecord) value, displayHex);
+ case KEY_VALUE:
+ return keyValueToMap((KeyValue) value.getNativeObject(), displayHex);
+ default:
+ return primitiveValueToMap(value.getNativeObject(), displayHex);
+ }
+ }
+
+ private static Map<String, Object> keyValueToMap(KeyValue value, boolean displayHex) throws IOException {
+ if (value == null) {
+ return ImmutableMap.of("value", "NULL");
+ }
+ return ImmutableMap.of("key", primitiveValueToMap(value.getKey(), displayHex),
+ "value", primitiveValueToMap(value.getValue(), displayHex));
+ }
+
+ private static Map<String, Object> primitiveValueToMap(Object value, boolean displayHex) throws IOException {
+ if (value == null) {
+ return ImmutableMap.of("value", "NULL");
+ }
+ if (value instanceof GenericObject) {
+ return genericObjectToMap((GenericObject) value, displayHex);
+ }
+ if (value instanceof byte[]) {
+ value = interpretByteArray(displayHex, (byte[]) value);
+ }
+ return ImmutableMap.of("value", value.toString(), "type", value.getClass());
+ }
+
+ private static Map<String, Object> genericRecordToMap(GenericRecord value, boolean displayHex) throws IOException {
+ Map<String, Object> res = new HashMap<>();
+ for (Field f : value.getFields()) {
+ Object fieldValue = value.getField(f);
+ if (fieldValue instanceof GenericRecord) {
+ fieldValue = genericRecordToMap((GenericRecord) fieldValue, displayHex);
+ } else if (fieldValue == null) {
+ fieldValue = "NULL";
+ } else if (fieldValue instanceof byte[]) {
+ fieldValue = interpretByteArray(displayHex, (byte[]) fieldValue);
+ }
+ res.put(f.getName(), fieldValue);
+ }
+ return res;
}
/**