You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/08/19 18:31:31 UTC

[kafka] branch 2.6 updated: Revert KAFKA-9309: Add the ability to translate Message to JSON (#9197)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 232a0f4  Revert KAFKA-9309: Add the ability to translate Message to JSON (#9197)
232a0f4 is described below

commit 232a0f48d3a2629578090937fb3b5097b78bd617
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Wed Aug 19 11:30:34 2020 -0700

    Revert KAFKA-9309: Add the ability to translate Message to JSON (#9197)
    
    This reverts commit bf6dffe93bbe0fe33ad076ebccebb840d66b936d
    
    Reviewers: Ismael Juma <is...@confluent.io>
---
 checkstyle/import-control.xml                      |   5 -
 .../org/apache/kafka/common/protocol/Message.java  |  42 ---
 .../apache/kafka/common/protocol/MessageUtil.java  | 104 +------
 .../apache/kafka/common/message/MessageTest.java   |  11 -
 .../apache/kafka/message/MessageDataGenerator.java | 323 ---------------------
 .../org/apache/kafka/message/MessageGenerator.java |  24 --
 6 files changed, 1 insertion(+), 508 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8c3cb286..3dac90e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,7 +123,6 @@
       <allow pkg="org.apache.kafka.common.record" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.common.resource" />
-      <allow pkg="com.fasterxml.jackson" />
     </subpackage>
 
     <subpackage name="record">
@@ -244,10 +243,6 @@
       <allow pkg="org.apache.kafka.connect.json" />
     </subpackage>
 
-    <subpackage name="internals">
-      <allow pkg="com.fasterxml.jackson" />
-    </subpackage>
-
     <subpackage name="perf">
       <allow pkg="com.fasterxml.jackson.databind" />
     </subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
index 3ff3304..2a313ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Message.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.common.protocol;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.common.protocol.types.RawTaggedField;
 import org.apache.kafka.common.protocol.types.Struct;
 
@@ -103,46 +101,6 @@ public interface Message {
     Struct toStruct(short version);
 
     /**
-     * Reads this message from a Jackson JsonNode object.  This will overwrite
-     * all relevant fields with information from the Struct.
-     *
-     * For the most part, we expect every JSON object in the input to be the
-     * correct type.  There is one exception: we will deserialize numbers
-     * represented as strings.  If the numeric string begins with 0x, we will
-     * treat the number as hexadecimal.
-     *
-     * Note that we expect to see NullNode objects created for null entries.
-     * Therefore, please configure your Jackson ObjectMapper with
-     * setSerializationInclusion({@link JsonInclude.Include#ALWAYS}).
-     * Other settings may silently omit the nulls, which is not the
-     * semantic that Kafka RPC uses.  (Including a field and setting it to
-     * null is different than not including the field.)
-     *
-     * @param node          The source node.
-     * @param version       The version to use.
-     *
-     * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
-     *                      If the specified JSON can't be processed with the
-     *                      specified message version.
-     */
-    void fromJson(JsonNode node, short version);
-
-    /**
-     * Convert this message to a JsonNode.
-     *
-     * Note that 64-bit numbers will be serialized as strings rather than as integers.
-     * The reason is because JavaScript can't represent numbers above 2**52 accurately.
-     * Therefore, for maximum interoperability, we represent these numbers as strings.
-     *
-     * @param version       The version to use.
-     *
-     * @throws {@see org.apache.kafka.common.errors.UnsupportedVersionException}
-     *                      If the specified version is too new to be supported
-     *                      by this software.
-     */
-    JsonNode toJson(short version);
-
-    /**
      * Returns a list of tagged fields which this software can't understand.
      *
      * @return              The raw tagged fields.
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
index 21eb35e..62f8837 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
@@ -17,13 +17,11 @@
 
 package org.apache.kafka.common.protocol;
 
-import com.fasterxml.jackson.databind.JsonNode;
-
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.UUID;
 
+
 public final class MessageUtil {
     public static final UUID ZERO_UUID = new UUID(0L, 0L);
 
@@ -55,106 +53,6 @@ public final class MessageUtil {
         return bld.toString();
     }
 
-    public static byte jsonNodeToByte(JsonNode node, String about) {
-        int value = jsonNodeToInt(node, about);
-        if (value > Byte.MAX_VALUE) {
-            if (value <= 256) {
-                // It's more traditional to refer to bytes as unsigned,
-                // so we support that here.
-                value -= 128;
-            } else {
-                throw new RuntimeException(about + ": value " + value +
-                    " does not fit in an 8-bit signed integer.");
-            }
-        }
-        if (value < Byte.MIN_VALUE) {
-            throw new RuntimeException(about + ": value " + value +
-                " does not fit in an 8-bit signed integer.");
-        }
-        return (byte) value;
-    }
-
-    public static short jsonNodeToShort(JsonNode node, String about) {
-        int value = jsonNodeToInt(node, about);
-        if ((value < Short.MIN_VALUE) || (value > Short.MAX_VALUE)) {
-            throw new RuntimeException(about + ": value " + value +
-                " does not fit in a 16-bit signed integer.");
-        }
-        return (short) value;
-    }
-
-    public static int jsonNodeToInt(JsonNode node, String about) {
-        if (node.isInt()) {
-            return node.asInt();
-        }
-        if (node.isTextual()) {
-            throw new NumberFormatException(about + ": expected an integer or " +
-                "string type, but got " + node.getNodeType());
-        }
-        String text = node.asText();
-        if (text.startsWith("0x")) {
-            try {
-                return Integer.parseInt(text.substring(2), 16);
-            } catch (NumberFormatException e) {
-                throw new NumberFormatException(about + ": failed to " +
-                    "parse hexadecimal number: " + e.getMessage());
-            }
-        } else {
-            try {
-                return Integer.parseInt(text);
-            } catch (NumberFormatException e) {
-                throw new NumberFormatException(about + ": failed to " +
-                    "parse number: " + e.getMessage());
-            }
-        }
-    }
-
-    public static long jsonNodeToLong(JsonNode node, String about) {
-        if (node.isLong()) {
-            return node.asLong();
-        }
-        if (node.isTextual()) {
-            throw new NumberFormatException(about + ": expected an integer or " +
-                "string type, but got " + node.getNodeType());
-        }
-        String text = node.asText();
-        if (text.startsWith("0x")) {
-            try {
-                return Long.parseLong(text.substring(2), 16);
-            } catch (NumberFormatException e) {
-                throw new NumberFormatException(about + ": failed to " +
-                    "parse hexadecimal number: " + e.getMessage());
-            }
-        } else {
-            try {
-                return Long.parseLong(text);
-            } catch (NumberFormatException e) {
-                throw new NumberFormatException(about + ": failed to " +
-                    "parse number: " + e.getMessage());
-            }
-        }
-    }
-
-    public static byte[] jsonNodeToBinary(JsonNode node, String about) {
-        if (!node.isBinary()) {
-            throw new RuntimeException(about + ": expected Base64-encoded binary data.");
-        }
-        try {
-            byte[] value = node.binaryValue();
-            return value;
-        } catch (IOException e) {
-            throw new RuntimeException(about + ": unable to retrieve Base64-encoded binary data", e);
-        }
-    }
-
-    public static double jsonNodeToDouble(JsonNode node, String about) {
-        if (!node.isFloatingPointNumber()) {
-            throw new NumberFormatException(about + ": expected a floating point " +
-                "type, but got " + node.getNodeType());
-        }
-        return node.asDouble();
-    }
-
     public static byte[] duplicate(byte[] array) {
         if (array == null) {
             return null;
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index d640f44..e0d7047 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.message;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
@@ -753,7 +752,6 @@ public final class MessageTest {
     private void testEquivalentMessageRoundTrip(short version, Message message) throws Exception {
         testStructRoundTrip(version, message, message);
         testByteBufferRoundTrip(version, message, message);
-        testJsonRoundTrip(version, message, message);
     }
 
     private void testByteBufferRoundTrip(short version, Message message, Message expected) throws Exception {
@@ -784,15 +782,6 @@ public final class MessageTest {
         assertEquals(expected.toString(), message2.toString());
     }
 
-    private void testJsonRoundTrip(short version, Message message, Message expected) throws Exception {
-        JsonNode jsonNode = message.toJson(version);
-        Message message2 = message.getClass().newInstance();
-        message2.fromJson(jsonNode, version);
-        assertEquals(expected, message2);
-        assertEquals(expected.hashCode(), message2.hashCode());
-        assertEquals(expected.toString(), message2.toString());
-    }
-
     /**
      * Verify that the JSON files support the same message versions as the
      * schemas accessible through the ApiKey class.
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 90c8bbe..46ea053 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -98,10 +98,6 @@ public final class MessageDataGenerator {
         buffer.printf("%n");
         generateClassToStruct(className, struct, parentVersions);
         buffer.printf("%n");
-        generateClassFromJson(className, struct, parentVersions);
-        buffer.printf("%n");
-        generateClassToJson(className, struct, parentVersions);
-        buffer.printf("%n");
         generateClassSize(className, struct, parentVersions);
         if (isSetElement) {
             buffer.printf("%n");
@@ -440,15 +436,6 @@ public final class MessageDataGenerator {
         buffer.printf("}%n");
         buffer.printf("%n");
 
-        headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
-        buffer.printf("public %s(JsonNode _node, short _version) {%n", className);
-        buffer.incrementIndent();
-        buffer.printf("fromJson(_node, _version);%n");
-        generateConstructorEpilogue(isSetElement);
-        buffer.decrementIndent();
-        buffer.printf("}%n");
-        buffer.printf("%n");
-
         buffer.printf("public %s() {%n", className);
         buffer.incrementIndent();
         for (FieldSpec field : struct.fields()) {
@@ -823,316 +810,6 @@ public final class MessageDataGenerator {
         buffer.printf("}%n");
     }
 
-    private void generateClassFromJson(String className, StructSpec struct,
-                                       Versions parentVersions) {
-        headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
-        buffer.printf("@Override%n");
-        buffer.printf("public void fromJson(JsonNode _node, short _version) {%n");
-        buffer.incrementIndent();
-        VersionConditional.forVersions(struct.versions(), parentVersions).
-            allowMembershipCheckAlwaysFalse(false).
-            ifNotMember(__ -> {
-                headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
-                buffer.printf("throw new UnsupportedVersionException(\"Can't read " +
-                    "version \" + _version + \" of %s\");%n", className);
-            }).
-            generate(buffer);
-        Versions curVersions = parentVersions.intersect(struct.versions());
-        for (FieldSpec field : struct.fields()) {
-            String sourceVariable = String.format("_%sNode", field.camelCaseName());
-            buffer.printf("JsonNode %s = _node.get(\"%s\");%n",
-                sourceVariable,
-                field.camelCaseName());
-            buffer.printf("if (%s == null) {%n", sourceVariable);
-            buffer.incrementIndent();
-            Versions mandatoryVersions = field.versions().subtract(field.taggedVersions());
-            VersionConditional.forVersions(mandatoryVersions, curVersions).
-                ifMember(__ -> {
-                    buffer.printf("throw new RuntimeException(\"%s: unable to locate " +
-                        "field \'%s\', which is mandatory in version \" + _version);%n",
-                        className, field.camelCaseName());
-                }).
-                ifNotMember(__ -> {
-                    buffer.printf("this.%s = %s;%n", field.camelCaseName(), fieldDefault(field));
-                }).
-                generate(buffer);
-            buffer.decrementIndent();
-            buffer.printf("} else {%n");
-            buffer.incrementIndent();
-            VersionConditional.forVersions(struct.versions(), curVersions).
-                ifMember(presentVersions -> {
-                    generateTargetFromJson(new Target(field,
-                        sourceVariable,
-                        className,
-                        input -> String.format("this.%s = %s", field.camelCaseName(), input)),
-                        curVersions);
-                }).ifNotMember(__ -> {
-                    buffer.printf("throw new RuntimeException(\"%s: field \'%s\' is not " +
-                        "supported in version \" + _version);%n",
-                        className, field.camelCaseName());
-                }).
-                generate(buffer);
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-        }
-        buffer.decrementIndent();
-        buffer.printf("}%n");
-    }
-
-    private void generateTargetFromJson(Target target, Versions curVersions) {
-        if (target.field().type() instanceof FieldType.BoolFieldType) {
-            buffer.printf("if (!%s.isBoolean()) {%n", target.sourceVariable());
-            buffer.incrementIndent();
-            buffer.printf("throw new RuntimeException(\"%s expected Boolean type, " +
-                "but got \" + _node.getNodeType());%n", target.humanReadableName());
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-            buffer.printf("%s;%n", target.assignmentStatement(
-                target.sourceVariable() + ".asBoolean()"));
-        } else if (target.field().type() instanceof FieldType.Int8FieldType) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("MessageUtil.jsonNodeToByte(%s, \"%s\")",
-                    target.sourceVariable(), target.humanReadableName())));
-        } else if (target.field().type() instanceof FieldType.Int16FieldType) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("MessageUtil.jsonNodeToShort(%s, \"%s\")",
-                    target.sourceVariable(), target.humanReadableName())));
-        } else if (target.field().type() instanceof FieldType.Int32FieldType) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("MessageUtil.jsonNodeToInt(%s, \"%s\")",
-                    target.sourceVariable(), target.humanReadableName())));
-        } else if (target.field().type() instanceof FieldType.Int64FieldType) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("MessageUtil.jsonNodeToLong(%s, \"%s\")",
-                    target.sourceVariable(), target.humanReadableName())));
-        } else if (target.field().type() instanceof FieldType.UUIDFieldType) {
-            buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
-            buffer.incrementIndent();
-            buffer.printf("throw new RuntimeException(\"%s expected a JSON string " +
-                "type, but got \" + _node.getNodeType());%n", target.humanReadableName());
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-            headerGenerator.addImport(MessageGenerator.UUID_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(String.format(
-                "UUID.fromString(%s.asText())", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.Float64FieldType) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("MessageUtil.jsonNodeToDouble(%s, \"%s\")",
-                    target.sourceVariable(), target.humanReadableName())));
-        } else {
-            // Handle the variable length types.  All of them are potentially
-            // nullable, so handle that here.
-            IsNullConditional.forName(target.sourceVariable()).
-                nullableVersions(target.field().nullableVersions()).
-                possibleVersions(curVersions).
-                conditionalGenerator((name, negated) ->
-                    String.format("%s%s.isNull()", negated ? "!" : "", name)).
-                ifNull(() -> {
-                    buffer.printf("%s;%n", target.assignmentStatement("null"));
-                }).
-                ifShouldNotBeNull(() -> {
-                    generateVariableLengthTargetFromJson(target, curVersions);
-                }).
-                generate(buffer);
-        }
-    }
-
-    private void generateVariableLengthTargetFromJson(Target target, Versions curVersions) {
-        if (target.field().type().isString()) {
-            buffer.printf("if (!%s.isTextual()) {%n", target.sourceVariable());
-            buffer.incrementIndent();
-            buffer.printf("throw new RuntimeException(\"%s expected a string " +
-                "type, but got \" + _node.getNodeType());%n", target.humanReadableName());
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("%s.asText()", target.sourceVariable())));
-        } else if (target.field().type().isBytes()) {
-            headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-            if (target.field().zeroCopy()) {
-                headerGenerator.addImport(MessageGenerator.BYTE_BUFFER_CLASS);
-                buffer.printf("%s;%n", target.assignmentStatement(
-                    String.format("ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\"))",
-                        target.sourceVariable(), target.humanReadableName())));
-            } else {
-                buffer.printf("%s;%n", target.assignmentStatement(
-                    String.format("MessageUtil.jsonNodeToBinary(%s, \"%s\")",
-                        target.sourceVariable(), target.humanReadableName())));
-            }
-        } else if (target.field().type().isArray()) {
-            buffer.printf("if (!%s.isArray()) {%n", target.sourceVariable());
-            buffer.incrementIndent();
-            buffer.printf("throw new RuntimeException(\"%s expected a JSON " +
-                "array, but got \" + _node.getNodeType());%n", target.humanReadableName());
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new %s()", fieldConcreteJavaType(target.field()))));
-            headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
-            buffer.printf("for (JsonNode _element : %s) {%n", target.sourceVariable());
-            buffer.incrementIndent();
-            generateTargetFromJson(target.arrayElementTarget(
-                input -> String.format("%s.add(%s)", target.field().camelCaseName(), input)),
-                curVersions);
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-        } else if (target.field().type().isStruct()) {
-            buffer.printf("%s;%n", target.assignmentStatement(String.format("new %s(%s, _version)",
-                target.field().type().toString(),
-                target.sourceVariable())));
-        } else {
-            throw new RuntimeException("Unexpected type " + target.field().type());
-        }
-    }
-
-    private void generateClassToJson(String className, StructSpec struct,
-                                     Versions parentVersions) {
-        headerGenerator.addImport(MessageGenerator.JSON_NODE_CLASS);
-        buffer.printf("@Override%n");
-        buffer.printf("public JsonNode toJson(short _version) {%n");
-        buffer.incrementIndent();
-        VersionConditional.forVersions(struct.versions(), parentVersions).
-            allowMembershipCheckAlwaysFalse(false).
-            ifNotMember(__ -> {
-                headerGenerator.addImport(MessageGenerator.UNSUPPORTED_VERSION_EXCEPTION_CLASS);
-                buffer.printf("throw new UnsupportedVersionException(\"Can't write " +
-                    "version \" + _version + \" of %s\");%n", className);
-            }).
-            generate(buffer);
-        Versions curVersions = parentVersions.intersect(struct.versions());
-        headerGenerator.addImport(MessageGenerator.OBJECT_NODE_CLASS);
-        headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
-        buffer.printf("ObjectNode _node = new ObjectNode(JsonNodeFactory.instance);%n");
-        for (FieldSpec field : struct.fields()) {
-            Target target = new Target(field,
-                String.format("this.%s", field.camelCaseName()),
-                field.camelCaseName(),
-                input -> String.format("_node.set(\"%s\", %s)", field.camelCaseName(), input));
-            VersionConditional cond = VersionConditional.forVersions(field.versions(), curVersions).
-                ifMember(presentVersions -> {
-                    VersionConditional.forVersions(field.taggedVersions(), presentVersions).
-                        ifMember(presentAndTaggedVersions -> {
-                            generateNonDefaultValueCheck(field, field.nullableVersions());
-                            buffer.incrementIndent();
-                            if (field.defaultString().equals("null")) {
-                                // If the default was null, and we already checked that this field was not
-                                // the default, we can omit further null checks.
-                                generateTargetToJson(target.nonNullableCopy(), presentAndTaggedVersions);
-                            } else {
-                                generateTargetToJson(target, presentAndTaggedVersions);
-                            }
-                            buffer.decrementIndent();
-                            buffer.printf("}%n");
-                        }).
-                        ifNotMember(presentAndNotTaggedVersions -> {
-                            generateTargetToJson(target, presentAndNotTaggedVersions);
-                        }).
-                        generate(buffer);
-                });
-            if (!field.ignorable()) {
-                cond.ifNotMember(__ -> {
-                    generateNonIgnorableFieldCheck(field);
-                });
-            }
-            cond.generate(buffer);
-        }
-        buffer.printf("return _node;%n");
-        buffer.decrementIndent();
-        buffer.printf("}%n");
-    }
-
-    private void generateTargetToJson(Target target, Versions versions) {
-        if (target.field().type() instanceof FieldType.BoolFieldType) {
-            headerGenerator.addImport(MessageGenerator.BOOLEAN_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("BooleanNode.valueOf(%s)", target.sourceVariable())));
-        } else if ((target.field().type() instanceof FieldType.Int8FieldType) ||
-                   (target.field().type() instanceof FieldType.Int16FieldType)) {
-            headerGenerator.addImport(MessageGenerator.SHORT_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new ShortNode(%s)", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.Int32FieldType) {
-            headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new IntNode(%s)", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.Int64FieldType) {
-            headerGenerator.addImport(MessageGenerator.LONG_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new LongNode(%s)", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.UUIDFieldType) {
-            headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new TextNode(%s.toString())", target.sourceVariable())));
-        } else if (target.field().type() instanceof FieldType.Float64FieldType) {
-            headerGenerator.addImport(MessageGenerator.DOUBLE_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new DoubleNode(%s)", target.sourceVariable())));
-        } else {
-            // Handle the variable length types.  All of them are potentially
-            // nullable, so handle that here.
-            IsNullConditional.forName(target.sourceVariable()).
-                nullableVersions(target.field().nullableVersions()).
-                possibleVersions(versions).
-                conditionalGenerator((name, negated) ->
-                    String.format("%s %s= null", name, negated ? "!" : "=")).
-                ifNull(() -> {
-                    headerGenerator.addImport(MessageGenerator.NULL_NODE_CLASS);
-                    buffer.printf("%s;%n", target.assignmentStatement("NullNode.instance"));
-                }).
-                ifShouldNotBeNull(() -> {
-                    generateVariableLengthTargetToJson(target, versions);
-                }).
-                generate(buffer);
-        }
-    }
-
-    private void generateVariableLengthTargetToJson(Target target, Versions versions) {
-        if (target.field().type().isString()) {
-            headerGenerator.addImport(MessageGenerator.TEXT_NODE_CLASS);
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("new TextNode(%s)", target.sourceVariable())));
-        } else if (target.field().type().isBytes()) {
-            headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-            if (target.field().zeroCopy()) {
-                headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS);
-                buffer.printf("%s;%n", target.assignmentStatement(
-                    String.format("new BinaryNode(MessageUtil.byteBufferToArray(%s))",
-                        target.sourceVariable())));
-            } else {
-                headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
-                buffer.printf("%s;%n", target.assignmentStatement(
-                    String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))",
-                        target.sourceVariable(), target.sourceVariable())));
-            }
-        } else if (target.field().type().isArray()) {
-            headerGenerator.addImport(MessageGenerator.ARRAY_NODE_CLASS);
-            headerGenerator.addImport(MessageGenerator.JSON_NODE_FACTORY_CLASS);
-            FieldType.ArrayType arrayType = (FieldType.ArrayType) target.field().type();
-            FieldType elementType = arrayType.elementType();
-            String arrayInstanceName = String.format("_%sArray", target.field().camelCaseName());
-            buffer.printf("ArrayNode %s = new ArrayNode(JsonNodeFactory.instance);%n", arrayInstanceName);
-            buffer.printf("for (%s _element : %s) {%n",
-                getBoxedJavaType(elementType), target.sourceVariable());
-            buffer.incrementIndent();
-            generateTargetToJson(target.arrayElementTarget(
-                input -> String.format("%s.add(%s)", arrayInstanceName, input)),
-                versions);
-            buffer.decrementIndent();
-            buffer.printf("}%n");
-            buffer.printf("%s;%n", target.assignmentStatement(arrayInstanceName));
-        } else if (target.field().type().isStruct()) {
-            buffer.printf("%s;%n", target.assignmentStatement(
-                String.format("%s.toJson(_version)", target.sourceVariable())));
-        } else {
-            throw new RuntimeException("unknown type " + target.field().type());
-        }
-    }
-
     private void generateArrayFromStruct(FieldSpec field, Versions versions) {
         IsNullConditional.forName("_nestedObjects").
             possibleVersions(versions).
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index f25a0e8..275b2c8 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -112,30 +112,6 @@ public final class MessageGenerator {
 
     static final String MAP_ENTRY_CLASS = "java.util.Map.Entry";
 
-    static final String JSON_NODE_CLASS = "com.fasterxml.jackson.databind.JsonNode";
-
-    static final String OBJECT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ObjectNode";
-
-    static final String JSON_NODE_FACTORY_CLASS = "com.fasterxml.jackson.databind.node.JsonNodeFactory";
-
-    static final String BOOLEAN_NODE_CLASS = "com.fasterxml.jackson.databind.node.BooleanNode";
-
-    static final String SHORT_NODE_CLASS = "com.fasterxml.jackson.databind.node.ShortNode";
-
-    static final String INT_NODE_CLASS = "com.fasterxml.jackson.databind.node.IntNode";
-
-    static final String LONG_NODE_CLASS = "com.fasterxml.jackson.databind.node.LongNode";
-
-    static final String TEXT_NODE_CLASS = "com.fasterxml.jackson.databind.node.TextNode";
-
-    static final String BINARY_NODE_CLASS = "com.fasterxml.jackson.databind.node.BinaryNode";
-
-    static final String NULL_NODE_CLASS = "com.fasterxml.jackson.databind.node.NullNode";
-
-    static final String ARRAY_NODE_CLASS = "com.fasterxml.jackson.databind.node.ArrayNode";
-
-    static final String DOUBLE_NODE_CLASS = "com.fasterxml.jackson.databind.node.DoubleNode";
-
     /**
      * The Jackson serializer we use for JSON objects.
      */