You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/08/19 18:31:31 UTC
[kafka] branch 2.6 updated: Revert KAFKA-9309: Add the ability to
translate Message to JSON (#9197)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 232a0f4 Revert KAFKA-9309: Add the ability to translate Message to JSON (#9197)
232a0f4 is described below
commit 232a0f48d3a2629578090937fb3b5097b78bd617
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Wed Aug 19 11:30:34 2020 -0700
Revert KAFKA-9309: Add the ability to translate Message to JSON (#9197)
This reverts commit bf6dffe93bbe0fe33ad076ebccebb840d66b936d
Reviewers: Ismael Juma <is...@confluent.io>
---
checkstyle/import-control.xml | 5 -
.../org/apache/kafka/common/protocol/Message.java | 42 ---
.../apache/kafka/common/protocol/MessageUtil.java | 104 +------
.../apache/kafka/common/message/MessageTest.java | 11 -
.../apache/kafka/message/MessageDataGenerator.java | 323 ---------------------
.../org/apache/kafka/message/MessageGenerator.java | 24 --
6 files changed, 1 insertion(+), 508 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8c3cb286..3dac90e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,7 +123,6 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
- <allow pkg="com.fasterxml.jackson" />
</subpackage>
<subpackage name="record">
@@ -244,10 +243,6 @@
<allow pkg="org.apache.kafka.connect.json" />
</subpackage>
- <subpackage name="internals">
- <allow pkg="com.fasterxml.jackson" />
- </subpackage>
-
<subpackage name="perf">
<allow pkg="com.fasterxml.jackson.databind" />
</subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
index 3ff3304..2a313ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
@@ -17,8 +17,6 @@
package org.apache.kafka.common.protocol;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.protocol.types.RawTaggedField;
import org.apache.kafka.common.protocol.types.Struct;
@@ -103,46 +101,6 @@ public interface Message {
Struct toStruct(short version);
/**
- * Reads this message from a Jackson JsonNode object. This will overwrite
- * all relevant fields with information from the Struct.
- *
- * For the most part, we expect every JSON object in the input to be the
- * correct type. There is one exception: we will deserialize numbers
- * represented as strings. If the numeric string begins with 0x, we will
- * treat the number as hexadecimal.
- *
- * Note that we expect to see NullNode objects created for null entries.
- * Therefore, please configure your Jackson ObjectMapper with
- * setSerializationInclusion({@link JsonInclude.Include#ALWAYS}).
- * Other settings may silently omit the nulls, which is not the
- * semantic that Kafka RPC uses. (Including a field and setting it to
- * null is different than not including the field.)
- *
- * @param node The source node.
- * @param version The version to use.
- *
- * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
- * If the specified JSON can't be processed with the
- * specified message version.
- */
- void fromJson(JsonNode node, short version);
-
- /**
- * Convert this message to a JsonNode.
- *
- * Note that 64-bit numbers will be serialized as strings rather than as integers.
- * The reason is because JavaScript can't represent numbers above 2**52 accurately.
- * Therefore, for maximum interoperability, we represent these numbers as strings.
- *
- * @param version The version to use.
- *
- * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
- * If the specified version is too new to be supported
- * by this software.
- */
- JsonNode toJson(short version);
-
- /**
* Returns a list of tagged fields which this software can't understand.
*
* @return The raw tagged fields.
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
index 21eb35e..62f8837 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
@@ -17,13 +17,11 @@
package org.apache.kafka.common.protocol;
-import com.fasterxml.jackson.databind.JsonNode;
-
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.UUID;
+
public final class MessageUtil {
public static final UUID ZERO_UUID = new UUID(0L, 0L);
@@ -55,106 +53,6 @@ public final class MessageUtil {
return bld.toString();
}
- public static byte jsonNodeToByte(JsonNode node, String about) {
- int value = jsonNodeToInt(node, about);
- if (value > Byte.MAX_VALUE) {
- if (value <= 256) {
- // It's more traditional to refer to bytes as unsigned,
- // so we support that here.
- value -= 128;
- } else {
- throw new RuntimeException(about + ": value " + value +
- " does not fit in an 8-bit signed integer.");
- }
- }
- if (value < Byte.MIN_VALUE) {
- throw new RuntimeException(about + ": value " + value +
- " does not fit in an 8-bit signed integer.");
- }
- return (byte) value;
- }
-
- public static short jsonNodeToShort(JsonNode node, String about) {
- int value = jsonNodeToInt(node, about);
- if ((value < Short.MIN_VALUE) || (value > Short.MAX_VALUE)) {
- throw new RuntimeException(about + ": value " + value +
- " does not fit in a 16-bit signed integer.");
- }
- return (short) value;
- }
-
- public static int jsonNodeToInt(JsonNode node, String about) {
- if (node.isInt()) {
- return node.asInt();
- }
- if (node.isTextual()) {
- throw new NumberFormatException(about + ": expected an integer or " +
- "string type, but got " + node.getNodeType());
- }
- String text = node.asText();
- if (text.startsWith("0x")) {
- try {
- return Integer.parseInt(text.substring(2), 16);
- } catch (NumberFormatException e) {
- throw new NumberFormatException(about + ": failed to " +
- "parse hexadecimal number: " + e.getMessage());
- }
- } else {
- try {
- return Integer.parseInt(text);
- } catch (NumberFormatException e) {
- throw new NumberFormatException(about + ": failed to " +
- "parse number: " + e.getMessage());
- }
- }
- }
-
- public static long jsonNodeToLong(JsonNode node, String about) {
- if (node.isLong()) {
- return node.asLong();
- }
- if (node.isTextual()) {
- throw new NumberFormatException(about + ": expected an integer or " +
- "string type, but got " + node.getNodeType());
- }
- String text = node.asText();
- if (text.startsWith("0x")) {
- try {
- return Long.parseLong(text.substring(2), 16);
- } catch (NumberFormatException e) {
- throw new NumberFormatException(about + ": failed to " +
- "parse hexadecimal number: " + e.getMessage());
- }
- } else {
- try {
- return Long.parseLong(text);
- } catch (NumberFormatException e) {
- throw new NumberFormatException(about + ": failed to " +
- "parse number: " + e.getMessage());
- }
- }
- }
-
- public static byte[] jsonNodeToBinary(JsonNode node, String about) {
- if (!node.isBinary()) {
- throw new RuntimeException(about + ": expected Base64-encoded binary data.");
- }
- try {
- byte[] value = node.binaryValue();
- return value;
- } catch (IOException e) {
- throw new RuntimeException(about + ": unable to retrieve Base64-encoded binary data", e);
- }
- }
-
- public static double jsonNodeToDouble(JsonNode node, String about) {
- if (!node.isFloatingPointNumber()) {
- throw new NumberFormatException(about + ": expected a floating point " +
- "type, but got " + node.getNodeType());
- }
- return node.asDouble();
- }
-
public static byte[] duplicate(byte[] array) {
if (array == null) {
return null;
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index d640f44..e0d7047 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.message;
-import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
@@ -753,7 +752,6 @@ public final class MessageTest {
private void testEquivalentMessageRoundTrip(short version, Message message) throws Exception {
testStructRoundTrip(version, message, message);
testByteBufferRoundTrip(version, message, message);
- testJsonRoundTrip(version, message, message);
}
private void testByteBufferRoundTrip(short version, Message message, Message expected) throws Exception {
@@ -784,15 +782,6 @@ public final class MessageTest {
assertEquals(expected.toString(), message2.toString());
}
- private void testJsonRoundTrip(short version, Message message, Message expected) throws Exception {
- JsonNode jsonNode = message.toJson(version);
- Message message2 = message.getClass().newInstance();
- message2.fromJson(jsonNode, version);
- assertEquals(expected, message2);
- assertEquals(expected.hashCode(), message2.hashCode());
- assertEquals(expected.toString(), message2.toString());
- }
-
/**
* Verify that the JSON files support the same message versions as the
* schemas accessible through the ApiKey class.
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 90c8bbe..46ea053 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -98,10 +98,6 @@ public final class MessageDataGenerator {
buffer.printf("%n");
generateClassToStruct(className, struct, parentVersions);
buffer.printf("%n");
- generateClassFromJson(className, struct, parentVersions);
- buffer.printf("%n");
- generateClassToJson(className, struct, parentVersions);
- buffer.printf("%n");
generateClassSize(className, struct, parentVersions);
if (isSetElement) {
buffer.printf("%n");
@@ -440,15 +436,6 @@ public final class MessageDataGenerator {
buffer.printf("}%n");
buffer.printf("%n");
- headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
- buffer.printf("public %s(JsonNode _node, short _version) {%n", className);
- buffer.incrementIndent();
- buffer.printf("fromJson(_node, _version);%n");
- generateConstructorEpilogue(isSetElement);
- buffer.decrementIndent();
- buffer.printf("}%n");
- buffer.printf("%n");
-
buffer.printf("public %s() {%n", className);
buffer.incrementIndent();
for (FieldSpec field : struct.fields()) {
@@ -823,316 +810,6 @@ public final class MessageDataGenerator {
buffer.printf("}%n");
}
- private void generateClassFromJson(String className, StructSpec struct,
- Versions parentVersions) {
- headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
- buffer.printf("@Override%n");
- buffer.printf("public void fromJson(JsonNode _node, short _version) {%n");
- buffer.incrementIndent();
- VersionConditional.forVersions(struct.versions(), parentVersions).
- allowMembershipCheckAlwaysFalse(false).
- ifNotMember(__ -> {
- headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
- buffer.printf("throw new UnsupportedVersionException(\"Can't read " +
- "version \" + _version + \" of %s\");%n", className);
- }).
- generate(buffer);
- Versions curVersions = parentVersions.intersect(struct.versions());
- for (FieldSpec field : struct.fields()) {
- String sourceVariable = String.format("_%sNode", field.camelCaseName());
- buffer.printf("JsonNode %s = _node.get(\"%s\");%n",
- sourceVariable,
- field.camelCaseName());
- buffer.printf("if (%s == null) {%n", sourceVariable);
- buffer.incrementIndent();
- Versions mandatoryVersions = field.versions().subtract(field.taggedVersions());
- VersionConditional.forVersions(mandatoryVersions, curVersions).
- ifMember(__ -> {
- buffer.printf("throw new RuntimeException(\"%s: unable to locate " +
- "field \'%s\', which is mandatory in version \" + _version);%n",
- className, field.camelCaseName());
- }).
- ifNotMember(__ -> {
- buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field));
- }).
- generate(buffer);
- buffer.decrementIndent();
- buffer.printf("} else {%n");
- buffer.incrementIndent();
- VersionConditional.forVersions(struct.versions(), curVersions).
- ifMember(presentVersions -> {
- generateTargetFromJson(new Target(field,
- sourceVariable,
- className,
- input -> String.format("this.%s = %s", field.camelCaseName(), input)),
- curVersions);
- }).ifNotMember(__ -> {
- buffer.printf("throw new RuntimeException(\"%s: field \'%s\' is not " +
- "supported in version \" + _version);%n",
- className, field.camelCaseName());
- }).
- generate(buffer);
- buffer.decrementIndent();
- buffer.printf("}%n");
- }
- buffer.decrementIndent();
- buffer.printf("}%n");
- }
-
- private void generateTargetFromJson(Target target, Versions curVersions) {
- if (target.field().type() instanceof FieldType.BoolFieldType) {
- buffer.printf("if (!%s.isBoolean()) {%n", target.sourceVariable());
- buffer.incrementIndent();
- buffer.printf("throw new RuntimeException(\"%s expected Boolean type, " +
- "but got \" + _node.getNodeType());%n", target.humanReadableName());
- buffer.decrementIndent();
- buffer.printf("}%n");
- buffer.printf("%s;%n", target.assignmentStatement(
- target.sourceVariable() + ".asBoolean()"));
- } else if (target.field().type() instanceof FieldType.Int8FieldType) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToByte(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- } else if (target.field().type() instanceof FieldType.Int16FieldType) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToShort(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- } else if (target.field().type() instanceof FieldType.Int32FieldType) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToInt(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- } else if (target.field().type() instanceof FieldType.Int64FieldType) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToLong(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- } else if (target.field().type() instanceof FieldType.UUIDFieldType) {
- buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
- buffer.incrementIndent();
- buffer.printf("throw new RuntimeException(\"%s expected a JSON string " +
- "type, but got \" + _node.getNodeType());%n", target.humanReadableName());
- buffer.decrementIndent();
- buffer.printf("}%n");
- headerGenerator.addImport(MessageGenerator.UUID_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(String.format(
- "UUID.fromString(%s.asText())", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.Float64FieldType) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToDouble(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- } else {
- // Handle the variable length types. All of them are potentially
- // nullable, so handle that here.
- IsNullConditional.forName(target.sourceVariable()).
- nullableVersions(target.field().nullableVersions()).
- possibleVersions(curVersions).
- conditionalGenerator((name, negated) ->
- String.format("%s%s.isNull()", negated ? "!" : "", name)).
- ifNull(() -> {
- buffer.printf("%s;%n", target.assignmentStatement("null"));
- }).
- ifShouldNotBeNull(() -> {
- generateVariableLengthTargetFromJson(target, curVersions);
- }).
- generate(buffer);
- }
- }
-
- private void generateVariableLengthTargetFromJson(Target target, Versions curVersions) {
- if (target.field().type().isString()) {
- buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
- buffer.incrementIndent();
- buffer.printf("throw new RuntimeException(\"%s expected a string " +
- "type, but got \" + _node.getNodeType());%n", target.humanReadableName());
- buffer.decrementIndent();
- buffer.printf("}%n");
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("%s.asText()", target.sourceVariable())));
- } else if (target.field().type().isBytes()) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- if (target.field().zeroCopy()) {
- headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\"))",
- target.sourceVariable(), target.humanReadableName())));
- } else {
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")",
- target.sourceVariable(), target.humanReadableName())));
- }
- } else if (target.field().type().isArray()) {
- buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable());
- buffer.incrementIndent();
- buffer.printf("throw new RuntimeException(\"%s expected a JSON " +
- "array, but got \" + _node.getNodeType());%n", target.humanReadableName());
- buffer.decrementIndent();
- buffer.printf("}%n");
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new %s()", fieldConcreteJavaType(target.field()))));
- headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
- buffer.printf("for (JsonNode _element : %s) {%n", target.sourceVariable());
- buffer.incrementIndent();
- generateTargetFromJson(target.arrayElementTarget(
- input -> String.format("%s.add(%s)", target.field().camelCaseName(), input)),
- curVersions);
- buffer.decrementIndent();
- buffer.printf("}%n");
- } else if (target.field().type().isStruct()) {
- buffer.printf("%s;%n", target.assignmentStatement(String.format("new %s(%s, _version)",
- target.field().type().toString(),
- target.sourceVariable())));
- } else {
- throw new RuntimeException("Unexpected type " + target.field().type());
- }
- }
-
- private void generateClassToJson(String className, StructSpec struct,
- Versions parentVersions) {
- headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
- buffer.printf("@Override%n");
- buffer.printf("public JsonNode toJson(short _version) {%n");
- buffer.incrementIndent();
- VersionConditional.forVersions(struct.versions(), parentVersions).
- allowMembershipCheckAlwaysFalse(false).
- ifNotMember(__ -> {
- headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
- buffer.printf("throw new UnsupportedVersionException(\"Can't write " +
- "version \" + _version + \" of %s\");%n", className);
- }).
- generate(buffer);
- Versions curVersions = parentVersions.intersect(struct.versions());
- headerGenerator.addImport(MessageGenerator.OBJECT_NODE_CLASS);
- headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
- buffer.printf("ObjectNode _node = new ObjectNode(JsonNodeFactory.instance);%n");
- for (FieldSpec field : struct.fields()) {
- Target target = new Target(field,
- String.format("this.%s", field.camelCaseName()),
- field.camelCaseName(),
- input -> String.format("_node.set(\"%s\", %s)", field.camelCaseName(), input));
- VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
- ifMember(presentVersions -> {
- VersionConditional.forVersions(field.taggedVersions(), presentVersions).
- ifMember(presentAndTaggedVersions -> {
- generateNonDefaultValueCheck(field, field.nullableVersions());
- buffer.incrementIndent();
- if (field.defaultString().equals("null")) {
- // If the default was null, and we already checked that this field was not
- // the default, we can omit further null checks.
- generateTargetToJson(target.nonNullableCopy(), presentAndTaggedVersions);
- } else {
- generateTargetToJson(target, presentAndTaggedVersions);
- }
- buffer.decrementIndent();
- buffer.printf("}%n");
- }).
- ifNotMember(presentAndNotTaggedVersions -> {
- generateTargetToJson(target, presentAndNotTaggedVersions);
- }).
- generate(buffer);
- });
- if (!field.ignorable()) {
- cond.ifNotMember(__ -> {
- generateNonIgnorableFieldCheck(field);
- });
- }
- cond.generate(buffer);
- }
- buffer.printf("return _node;%n");
- buffer.decrementIndent();
- buffer.printf("}%n");
- }
-
- private void generateTargetToJson(Target target, Versions versions) {
- if (target.field().type() instanceof FieldType.BoolFieldType) {
- headerGenerator.addImport(MessageGenerator.BOOLEAN_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("BooleanNode.valueOf(%s)", target.sourceVariable())));
- } else if ((target.field().type() instanceof FieldType.Int8FieldType) ||
- (target.field().type() instanceof FieldType.Int16FieldType)) {
- headerGenerator.addImport(MessageGenerator.SHORT_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new ShortNode(%s)", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.Int32FieldType) {
- headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new IntNode(%s)", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.Int64FieldType) {
- headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new LongNode(%s)", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.UUIDFieldType) {
- headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new TextNode(%s.toString())", target.sourceVariable())));
- } else if (target.field().type() instanceof FieldType.Float64FieldType) {
- headerGenerator.addImport(MessageGenerator.DOUBLE_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new DoubleNode(%s)", target.sourceVariable())));
- } else {
- // Handle the variable length types. All of them are potentially
- // nullable, so handle that here.
- IsNullConditional.forName(target.sourceVariable()).
- nullableVersions(target.field().nullableVersions()).
- possibleVersions(versions).
- conditionalGenerator((name, negated) ->
- String.format("%s %s= null", name, negated ? "!" : "=")).
- ifNull(() -> {
- headerGenerator.addImport(MessageGenerator.NULL_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement("NullNode.instance"));
- }).
- ifShouldNotBeNull(() -> {
- generateVariableLengthTargetToJson(target, versions);
- }).
- generate(buffer);
- }
- }
-
- private void generateVariableLengthTargetToJson(Target target, Versions versions) {
- if (target.field().type().isString()) {
- headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new TextNode(%s)", target.sourceVariable())));
- } else if (target.field().type().isBytes()) {
- headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
- if (target.field().zeroCopy()) {
- headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new BinaryNode(MessageUtil.byteBufferToArray(%s))",
- target.sourceVariable())));
- } else {
- headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))",
- target.sourceVariable(), target.sourceVariable())));
- }
- } else if (target.field().type().isArray()) {
- headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS);
- headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
- FieldType.ArrayType arrayType = (FieldType.ArrayType) target.field().type();
- FieldType elementType = arrayType.elementType();
- String arrayInstanceName = String.format("_%sArray", target.field().camelCaseName());
- buffer.printf("ArrayNode %s = new ArrayNode(JsonNodeFactory.instance);%n", arrayInstanceName);
- buffer.printf("for (%s _element : %s) {%n",
- getBoxedJavaType(elementType), target.sourceVariable());
- buffer.incrementIndent();
- generateTargetToJson(target.arrayElementTarget(
- input -> String.format("%s.add(%s)", arrayInstanceName, input)),
- versions);
- buffer.decrementIndent();
- buffer.printf("}%n");
- buffer.printf("%s;%n", target.assignmentStatement(arrayInstanceName));
- } else if (target.field().type().isStruct()) {
- buffer.printf("%s;%n", target.assignmentStatement(
- String.format("%s.toJson(_version)", target.sourceVariable())));
- } else {
- throw new RuntimeException("unknown type " + target.field().type());
- }
- }
-
private void generateArrayFromStruct(FieldSpec field, Versions versions) {
IsNullConditional.forName("_nestedObjects").
possibleVersions(versions).
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index f25a0e8..275b2c8 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -112,30 +112,6 @@ public final class MessageGenerator {
static final String MAP_ENTRY_CLASS = "java.util.Map.Entry";
- static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode";
-
- static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode";
-
- static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory";
-
- static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode";
-
- static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode";
-
- static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode";
-
- static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode";
-
- static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode";
-
- static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode";
-
- static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode";
-
- static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode";
-
- static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";
-
/**
* The Jackson serializer we use for JSON objects.
*/