You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/14 23:58:47 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r613642681



##########
File path: checkstyle/import-control.xml
##########
@@ -179,6 +179,7 @@
     </subpackage>
 
     <subpackage name="serialization">
+      <allow pkg="org.apache.kafka.clients" />

Review comment:
       Just wondering, what is the reason for this change?

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(
+        mkEntry(ShortDeserializer.class, 2),
+        mkEntry(IntegerDeserializer.class, 4),
+        mkEntry(FloatDeserializer.class, 4),
+        mkEntry(LongDeserializer.class, 8),
+        mkEntry(DoubleDeserializer.class, 8),
+        mkEntry(UUIDDeserializer.class, 36)
+    );
+
+    public ListDeserializer() {}
+
+    public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> innerDeserializer) {
+        this.listClass = listClass;
+        this.inner = innerDeserializer;
+        if (innerDeserializer != null) {
+            this.primitiveSize = fixedLengthDeserializers.get(innerDeserializer.getClass());
+        }
+    }
+
+    public Deserializer<Inner> getInnerDeserializer() {
+        return inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        if (listClass == null) {
+            configureListClass(configs, isKey);
+        }
+        if (inner == null) {
+            configureInnerSerde(configs, isKey);
+        }
+    }
+
+    private void configureListClass(Map<String, ?> configs, boolean isKey) {
+        String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+        final Object listClassOrName = configs.get(listTypePropertyName);
+        if (listClassOrName == null) {
+            throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config.");
+        }
+        try {
+            if (listClassOrName instanceof String) {
+                listClass = Utils.loadClass((String) listClassOrName, Object.class);
+            } else if (listClassOrName instanceof Class) {
+                listClass = (Class<?>) listClassOrName;
+            } else {
+                throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property.");
+            }
+        } catch (final ClassNotFoundException e) {
+            throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void configureInnerSerde(Map<String, ?> configs, boolean isKey) {
+        String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;

Review comment:
       What about the `LIST_KEY_DESERIALIZER_INNER_CLASS_CONFIG`?

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(
+        mkEntry(ShortDeserializer.class, 2),
+        mkEntry(IntegerDeserializer.class, 4),
+        mkEntry(FloatDeserializer.class, 4),
+        mkEntry(LongDeserializer.class, 8),
+        mkEntry(DoubleDeserializer.class, 8),
+        mkEntry(UUIDDeserializer.class, 36)
+    );
+
+    public ListDeserializer() {}
+
+    public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> innerDeserializer) {
+        this.listClass = listClass;
+        this.inner = innerDeserializer;
+        if (innerDeserializer != null) {

Review comment:
       Should it be valid for this to be null? I would think that these Serdes should be configured either by instantiating it directly via this constructor, or via the default constructor + setting configs (eg list.key.serializer.inner). It doesn't seem to make sense to use this constructor and not pass in valid arguments.
   WDYT about throwing an exception if either parameter is `null` -- not sure if ConfigException or IllegalArgumentException is more appropriate, up to you

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(
+        mkEntry(ShortDeserializer.class, 2),
+        mkEntry(IntegerDeserializer.class, 4),
+        mkEntry(FloatDeserializer.class, 4),
+        mkEntry(LongDeserializer.class, 8),
+        mkEntry(DoubleDeserializer.class, 8),
+        mkEntry(UUIDDeserializer.class, 36)
+    );
+
+    public ListDeserializer() {}
+
+    public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> innerDeserializer) {
+        this.listClass = listClass;
+        this.inner = innerDeserializer;
+        if (innerDeserializer != null) {
+            this.primitiveSize = fixedLengthDeserializers.get(innerDeserializer.getClass());
+        }
+    }
+
+    public Deserializer<Inner> getInnerDeserializer() {
+        return inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        if (listClass == null) {
+            configureListClass(configs, isKey);
+        }
+        if (inner == null) {
+            configureInnerSerde(configs, isKey);
+        }
+    }
+
+    private void configureListClass(Map<String, ?> configs, boolean isKey) {
+        String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+        final Object listClassOrName = configs.get(listTypePropertyName);
+        if (listClassOrName == null) {
+            throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config.");
+        }
+        try {
+            if (listClassOrName instanceof String) {
+                listClass = Utils.loadClass((String) listClassOrName, Object.class);
+            } else if (listClassOrName instanceof Class) {
+                listClass = (Class<?>) listClassOrName;
+            } else {
+                throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property.");
+            }
+        } catch (final ClassNotFoundException e) {
+            throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void configureInnerSerde(Map<String, ?> configs, boolean isKey) {
+        String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+        final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName);
+        if (innerSerdeClassOrName == null) {
+            throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config.");
+        }
+        try {
+            if (innerSerdeClassOrName instanceof String) {
+                inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer();
+            } else if (innerSerdeClassOrName instanceof Class) {
+                inner = (Deserializer<Inner>) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer();
+            } else {
+                throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property.");
+            }
+            inner.configure(configs, isKey);
+        } catch (final ClassNotFoundException e) {
+            throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found.");

Review comment:
       ```suggestion
               throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" was not a valid Serde/Deserializer.");
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(
+        mkEntry(ShortDeserializer.class, 2),
+        mkEntry(IntegerDeserializer.class, 4),
+        mkEntry(FloatDeserializer.class, 4),
+        mkEntry(LongDeserializer.class, 8),
+        mkEntry(DoubleDeserializer.class, 8),

Review comment:
       nit: can we use `Double.SIZE` instead of just `8`, that way it's super clear that this value actually means?

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(

Review comment:
       nit: use `private static`  ordering (for consistency with the rest of the code base)

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
+
+    private Deserializer<Inner> inner;
+    private Class<?> listClass;
+    private Integer primitiveSize;
+
+    static private Map<Class<? extends Deserializer<?>>, Integer> fixedLengthDeserializers = mkMap(
+        mkEntry(ShortDeserializer.class, 2),
+        mkEntry(IntegerDeserializer.class, 4),
+        mkEntry(FloatDeserializer.class, 4),
+        mkEntry(LongDeserializer.class, 8),
+        mkEntry(DoubleDeserializer.class, 8),
+        mkEntry(UUIDDeserializer.class, 36)
+    );
+
+    public ListDeserializer() {}
+
+    public <L extends List<Inner>> ListDeserializer(Class<L> listClass, Deserializer<Inner> innerDeserializer) {
+        this.listClass = listClass;
+        this.inner = innerDeserializer;
+        if (innerDeserializer != null) {
+            this.primitiveSize = fixedLengthDeserializers.get(innerDeserializer.getClass());
+        }
+    }
+
+    public Deserializer<Inner> getInnerDeserializer() {
+        return inner;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        if (listClass == null) {

Review comment:
       If the `listClass` and `inner` have already been set by invoking the non-default constructor, but the user also set the `list.key.deserializer.inner` configs, should we verify that the configs match and throw a ConfigException otherwise?

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##########
@@ -77,21 +87,39 @@ public void configure(Map<String, ?> configs, boolean isKey) {
         }
     }
 
+    private void serializeNullIndexList(final DataOutputStream out, List<Inner> data) throws IOException {
+        List<Integer> nullIndexList = IntStream.range(0, data.size())
+                .filter(i -> data.get(i) == null)
+                .boxed().collect(Collectors.toList());
+        out.writeInt(nullIndexList.size());
+        for (int i : nullIndexList) out.writeInt(i);
+    }
+
     @Override
     public byte[] serialize(String topic, List<Inner> data) {
         if (data == null) {
             return null;
         }
-        final int size = data.size();
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeByte(serStrategy.ordinal()); // write serialization strategy flag
+            if (serStrategy == SerializationStrategy.NULL_INDEX_LIST) {
+                serializeNullIndexList(out, data);
+            }
+            final int size = data.size();
             out.writeInt(size);
             for (Inner entry : data) {
-                final byte[] bytes = inner.serialize(topic, entry);
-                if (!isFixedLength) {
-                    out.writeInt(bytes.length);
+                if (entry == null) {
+                    if (serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(Serdes.ListSerde.NEGATIVE_SIZE_VALUE);
+                    }
+                } else {
+                    final byte[] bytes = inner.serialize(topic, entry);
+                    if (!isFixedLength || serStrategy == SerializationStrategy.NEGATIVE_SIZE) {
+                        out.writeInt(bytes.length);

Review comment:
       Hey, sorry that I'm jumping in here after there's been a long discussion which I missed, but I'm wondering why the serialization strategy would be configurable? IIUC the serialization strategy correctly, one of them basically means "constant-size data/primitive type, don't encode the size only length of list" while the other means "variable-size data, encode the size of each element only"
   
   I assume this is to allow users to indicate that their data is constant size when its a non-primitize type, to avoid the need to encode this same size data -- that makes sense to me. But I think we can simplify the API a bit so we don't have to let users shoot themselves in the foot, as you said earlier 🙂 
   
   How about: if it's a primitive type, and we can detect this (I think we should be able to), then we never encode the size info. If a user opts to do so, just log a warning and ignore it.
   
   By the way, this might also be due to some earlier discussion I missed, but I find the names of the two SerializationStrategy enums super confusing. How about just `VARIABLE_SIZE` and `CONSTANT_SIZE`? Imo it's better to describe what the enum actually _means_ than how its implemented, you can read the code to understand the latter. But you shouldn't need to read the code to understand what a config means. Plus, this way we have flexibility to change the underlying implementation if we ever need to without also having to change the enum names which are now a public API

##########
File path: clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
##########
@@ -128,13 +128,27 @@ public UUIDSerde() {
 
     static public final class ListSerde<Inner> extends WrapperSerde<List<Inner>> {
 
+        final static int NEGATIVE_SIZE_VALUE = -1;

Review comment:
       What is this? Can you give it a name that describes what it means a little more -- IIUC this is a sentinel that indicates "this list has variable-sized elements so we encode each element's size".
   
   That said, coming up with names is hard -- you can probably do a better job than me but just to throw out a suggestion, what about something like `VARIABLE_SIZE_SENTINEL`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org