You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/01/21 20:02:55 UTC

[kafka] branch 2.4 updated: KAFKA-9083: Various fixes/improvements for Connect's Values class (#7593)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 194c988  KAFKA-9083: Various fixes/improvements for Connect's Values class (#7593)
194c988 is described below

commit 194c988f1abfb4ff9a3ee2ab806b76496ced6bd9
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Jan 21 13:33:50 2020 -0600

    KAFKA-9083: Various fixes/improvements for Connect's Values class (#7593)
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../apache/kafka/connect/data/SchemaBuilder.java   |  20 +++
 .../java/org/apache/kafka/connect/data/Values.java | 108 +++++++++----
 .../org/apache/kafka/connect/data/ValuesTest.java  | 167 +++++++++++++++++++--
 .../connect/storage/SimpleHeaderConverterTest.java |  30 +++-
 4 files changed, 278 insertions(+), 47 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
index fdcf05a..722f5fc 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
@@ -382,6 +382,26 @@ public class SchemaBuilder implements Schema {
         return builder;
     }
 
+    static SchemaBuilder arrayOfNull() {
+        return new SchemaBuilder(Type.ARRAY);
+    }
+
+    static SchemaBuilder mapOfNull() {
+        return new SchemaBuilder(Type.MAP);
+    }
+
+    static SchemaBuilder mapWithNullKeys(Schema valueSchema) {
+        SchemaBuilder result = new SchemaBuilder(Type.MAP);
+        result.valueSchema = valueSchema;
+        return result;
+    }
+
+    static SchemaBuilder mapWithNullValues(Schema keySchema) {
+        SchemaBuilder result = new SchemaBuilder(Type.MAP);
+        result.keySchema = keySchema;
+        return result;
+    }
+
     @Override
     public Schema keySchema() {
         return keySchema;
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index c1bebdf..93c320a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -723,14 +723,30 @@ public class Values {
         return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN);
     }
 
+    protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embedded, String tokenLiteral) {
+        int startPosition = parser.mark();
+        // If the next token is what we expect, then either...
+        if (parser.canConsume(tokenLiteral)) {
+            //   ...we're reading an embedded value, in which case the next token will be handled appropriately
+            //      by the caller if it's something like an end delimiter for a map or array, or a comma to
+            //      separate multiple embedded values...
+            //   ...or it's being parsed as part of a top-level string, in which case, any other tokens should
+            //      cause use to stop parsing this single-token literal as such and instead just treat it like
+            //      a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and
+            //      "}", but should ultimately be parsed as just the string "true}" instead of the boolean true.
+            if (embedded || !parser.hasNext()) {
+                return true;
+            }
+        }
+        parser.rewindTo(startPosition);
+        return false;
+    }
+
     protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
         if (!parser.hasNext()) {
             return null;
         }
         if (embedded) {
-            if (parser.canConsume(NULL_VALUE)) {
-                return null;
-            }
             if (parser.canConsume(QUOTE_DELIMITER)) {
                 StringBuilder sb = new StringBuilder();
                 while (parser.hasNext()) {
@@ -742,34 +758,51 @@ public class Values {
                 return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
             }
         }
-        if (parser.canConsume(TRUE_LITERAL)) {
+
+        if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) {
+            return null;
+        }
+        if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) {
             return TRUE_SCHEMA_AND_VALUE;
         }
-        if (parser.canConsume(FALSE_LITERAL)) {
+        if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) {
             return FALSE_SCHEMA_AND_VALUE;
         }
+
         int startPosition = parser.mark();
+
         try {
             if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
                 List<Object> result = new ArrayList<>();
                 Schema elementSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(ARRAY_END_DELIMITER)) {
-                        Schema listSchema = null;
+                        Schema listSchema;
                         if (elementSchema != null) {
                             listSchema = SchemaBuilder.array(elementSchema).schema();
+                            result = alignListEntriesWithSchema(listSchema, result);
+                        } else {
+                            // Every value is null
+                            listSchema = SchemaBuilder.arrayOfNull().build();
                         }
-                        result = alignListEntriesWithSchema(listSchema, result);
                         return new SchemaAndValue(listSchema, result);
                     }
+
                     if (parser.canConsume(COMMA_DELIMITER)) {
                         throw new DataException("Unable to parse an empty array element: " + parser.original());
                     }
                     SchemaAndValue element = parse(parser, true);
                     elementSchema = commonSchemaFor(elementSchema, element);
-                    result.add(element.value());
-                    parser.canConsume(COMMA_DELIMITER);
+                    result.add(element != null ? element.value() : null);
+
+                    int currentPosition = parser.mark();
+                    if (parser.canConsume(ARRAY_END_DELIMITER)) {
+                        parser.rewindTo(currentPosition);
+                    } else if (!parser.canConsume(COMMA_DELIMITER)) {
+                        throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter");
+                    }
                 }
+
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
                     throw new DataException("Array is missing element after ',': " + parser.original());
@@ -783,26 +816,34 @@ public class Values {
                 Schema valueSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(MAP_END_DELIMITER)) {
-                        Schema mapSchema = null;
+                        Schema mapSchema;
                         if (keySchema != null && valueSchema != null) {
-                            mapSchema = SchemaBuilder.map(keySchema, valueSchema).schema();
+                            mapSchema = SchemaBuilder.map(keySchema, valueSchema).build();
+                            result = alignMapKeysAndValuesWithSchema(mapSchema, result);
+                        } else if (keySchema != null) {
+                            mapSchema = SchemaBuilder.mapWithNullValues(keySchema);
+                            result = alignMapKeysWithSchema(mapSchema, result);
+                        } else {
+                            mapSchema = SchemaBuilder.mapOfNull().build();
                         }
-                        result = alignMapKeysAndValuesWithSchema(mapSchema, result);
                         return new SchemaAndValue(mapSchema, result);
                     }
+
                     if (parser.canConsume(COMMA_DELIMITER)) {
-                        throw new DataException("Unable to parse a map entry has no key or value: " + parser.original());
+                        throw new DataException("Unable to parse a map entry with no key or value: " + parser.original());
                     }
                     SchemaAndValue key = parse(parser, true);
                     if (key == null || key.value() == null) {
                         throw new DataException("Map entry may not have a null key: " + parser.original());
                     }
+
                     if (!parser.canConsume(ENTRY_DELIMITER)) {
                         throw new DataException("Map entry is missing '=': " + parser.original());
                     }
                     SchemaAndValue value = parse(parser, true);
                     Object entryValue = value != null ? value.value() : null;
                     result.put(key.value(), entryValue);
+
                     parser.canConsume(COMMA_DELIMITER);
                     keySchema = commonSchemaFor(keySchema, key);
                     valueSchema = commonSchemaFor(valueSchema, value);
@@ -811,14 +852,19 @@ public class Values {
                 if (COMMA_DELIMITER.equals(parser.previous())) {
                     throw new DataException("Map is missing element after ',': " + parser.original());
                 }
-                throw new DataException("Map is missing terminating ']': " + parser.original());
+                throw new DataException("Map is missing terminating '}': " + parser.original());
             }
         } catch (DataException e) {
-            LOG.debug("Unable to parse the value as a map; reverting to string", e);
+            LOG.debug("Unable to parse the value as a map or an array; reverting to string", e);
             parser.rewindTo(startPosition);
         }
-        String token = parser.next().trim();
-        assert !token.isEmpty(); // original can be empty string but is handled right away; no way for token to be empty here
+
+        String token = parser.next();
+        if (token.trim().isEmpty()) {
+            return new SchemaAndValue(Schema.STRING_SCHEMA, token);
+        }
+        token = token.trim();
+
         char firstChar = token.charAt(0);
         boolean firstCharIsDigit = Character.isDigit(firstChar);
         if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
@@ -878,6 +924,9 @@ public class Values {
                 }
             }
         }
+        if (embedded) {
+            throw new DataException("Failed to parse embedded value");
+        }
         // At this point, the only thing this can be is a string. Embedded strings were processed above,
         // so this is not embedded and we can use the original string...
         return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
@@ -953,9 +1002,6 @@ public class Values {
     }
 
     protected static List<Object> alignListEntriesWithSchema(Schema schema, List<Object> input) {
-        if (schema == null) {
-            return input;
-        }
         Schema valueSchema = schema.valueSchema();
         List<Object> result = new ArrayList<>();
         for (Object value : input) {
@@ -966,9 +1012,6 @@ public class Values {
     }
 
     protected static Map<Object, Object> alignMapKeysAndValuesWithSchema(Schema mapSchema, Map<Object, Object> input) {
-        if (mapSchema == null) {
-            return input;
-        }
         Schema keySchema = mapSchema.keySchema();
         Schema valueSchema = mapSchema.valueSchema();
         Map<Object, Object> result = new LinkedHashMap<>();
@@ -980,6 +1023,16 @@ public class Values {
         return result;
     }
 
+    protected static Map<Object, Object> alignMapKeysWithSchema(Schema mapSchema, Map<Object, Object> input) {
+        Schema keySchema = mapSchema.keySchema();
+        Map<Object, Object> result = new LinkedHashMap<>();
+        for (Map.Entry<?, ?> entry : input.entrySet()) {
+            Object newKey = convertTo(keySchema, null, entry.getKey());
+            result.put(newKey, entry.getValue());
+        }
+        return result;
+    }
+
     protected static class SchemaDetector {
         private Type knownType = null;
         private boolean optional = false;
@@ -1123,12 +1176,13 @@ public class Values {
                 nextToken = consumeNextToken();
             }
             if (ignoreLeadingAndTrailingWhitespace) {
-                nextToken = nextToken.trim();
-                while (nextToken.isEmpty() && canConsumeNextToken()) {
-                    nextToken = consumeNextToken().trim();
+                while (nextToken.trim().isEmpty() && canConsumeNextToken()) {
+                    nextToken = consumeNextToken();
                 }
             }
-            return nextToken.equals(expected);
+            return ignoreLeadingAndTrailingWhitespace
+                ? nextToken.trim().equals(expected)
+                : nextToken.equals(expected);
         }
     }
 }
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 162222d..a5909f3 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +38,8 @@ import static org.junit.Assert.fail;
 
 public class ValuesTest {
 
+    private static final String WHITESPACE = "\n \t \t\n";
+
     private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
 
     private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
@@ -68,6 +73,149 @@ public class ValuesTest {
     }
 
     @Test
+    public void shouldNotParseUnquotedEmbeddedMapKeysAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("{foo: 3}");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("{foo: 3}", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseUnquotedEmbeddedMapValuesAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("{3: foo}");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("{3: foo}", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseUnquotedArrayElementsAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("[foo]");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("[foo]", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseStringsBeginningWithNullAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("null=");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("null=", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseStringsBeginningWithTrueAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("true}");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("true}", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseStringsBeginningWithFalseAsStrings() {
+        SchemaAndValue schemaAndValue = Values.parseString("false]");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("false]", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseTrueAsBooleanIfSurroundedByWhitespace() {
+        SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "true" + WHITESPACE);
+        assertEquals(Type.BOOLEAN, schemaAndValue.schema().type());
+        assertEquals(true, schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseFalseAsBooleanIfSurroundedByWhitespace() {
+        SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "false" + WHITESPACE);
+        assertEquals(Type.BOOLEAN, schemaAndValue.schema().type());
+        assertEquals(false, schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseNullAsNullIfSurroundedByWhitespace() {
+        SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "null" + WHITESPACE);
+        assertNull(schemaAndValue);
+    }
+
+    @Test
+    public void shouldParseBooleanLiteralsEmbeddedInArray() {
+        SchemaAndValue schemaAndValue = Values.parseString("[true, false]");
+        assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+        assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type());
+        assertEquals(Arrays.asList(true, false), schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseBooleanLiteralsEmbeddedInMap() {
+        SchemaAndValue schemaAndValue = Values.parseString("{true: false, false: true}");
+        assertEquals(Type.MAP, schemaAndValue.schema().type());
+        assertEquals(Type.BOOLEAN, schemaAndValue.schema().keySchema().type());
+        assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type());
+        Map<Boolean, Boolean> expectedValue = new HashMap<>();
+        expectedValue.put(true, false);
+        expectedValue.put(false, true);
+        assertEquals(expectedValue, schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseAsMapWithoutCommas() {
+        SchemaAndValue schemaAndValue = Values.parseString("{6:9 4:20}");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("{6:9 4:20}", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseAsArrayWithoutCommas() {
+        SchemaAndValue schemaAndValue = Values.parseString("[0 1 2]");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("[0 1 2]", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseEmptyMap() {
+        SchemaAndValue schemaAndValue = Values.parseString("{}");
+        assertEquals(Type.MAP, schemaAndValue.schema().type());
+        assertEquals(Collections.emptyMap(), schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseEmptyArray() {
+        SchemaAndValue schemaAndValue = Values.parseString("[]");
+        assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+        assertEquals(Collections.emptyList(), schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldNotParseAsMapWithNullKeys() {
+        SchemaAndValue schemaAndValue = Values.parseString("{null: 3}");
+        assertEquals(Type.STRING, schemaAndValue.schema().type());
+        assertEquals("{null: 3}", schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseNull() {
+        SchemaAndValue schemaAndValue = Values.parseString("null");
+        assertNull(schemaAndValue);
+    }
+
+    @Test
+    public void shouldConvertStringOfNull() {
+        assertRoundTrip(Schema.STRING_SCHEMA, "null");
+    }
+
+    @Test
+    public void shouldParseNullMapValues() {
+        SchemaAndValue schemaAndValue = Values.parseString("{3: null}");
+        assertEquals(Type.MAP, schemaAndValue.schema().type());
+        assertEquals(Type.INT8, schemaAndValue.schema().keySchema().type());
+        assertEquals(Collections.singletonMap((byte) 3, null), schemaAndValue.value());
+    }
+
+    @Test
+    public void shouldParseNullArrayElements() {
+        SchemaAndValue schemaAndValue = Values.parseString("[null]");
+        assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+        assertEquals(Collections.singletonList(null), schemaAndValue.value());
+    }
+
+    @Test
     public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() {
         String original = "three\"blind\\\"mice";
         String expected = "three\\\"blind\\\\\\\"mice";
@@ -214,7 +362,8 @@ public class ValuesTest {
     public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() {
         String str = "[1, 2, 3, \"four\"]";
         SchemaAndValue result = Values.parseString(str);
-        assertNull(result.schema());
+        assertEquals(Type.ARRAY, result.schema().type());
+        assertNull(result.schema().valueSchema());
         List<?> list = (List<?>) result.value();
         assertEquals(4, list.size());
         assertEquals(1, ((Number) list.get(0)).intValue());
@@ -256,7 +405,7 @@ public class ValuesTest {
      */
     @Test(expected = DataException.class)
     public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() {
-        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 ,, \"bar\" : 0,  \"baz\" : -987654321 }  ");
+        Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 ,, \"bar\" : 0,  \"baz\" : -987654321 }  ");
     }
 
     /**
@@ -264,7 +413,7 @@ public class ValuesTest {
      */
     @Test(expected = DataException.class)
     public void shouldFailToParseStringOfMalformedMap() {
-        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 , \"a\", \"bar\" : 0,  \"baz\" : -987654321 }  ");
+        Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 , \"a\", \"bar\" : 0,  \"baz\" : -987654321 }  ");
     }
 
     /**
@@ -272,7 +421,7 @@ public class ValuesTest {
      */
     @Test(expected = DataException.class)
     public void shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() {
-        Values.convertToList(Schema.STRING_SCHEMA, " { ,,  , , }  ");
+        Values.convertToMap(Schema.STRING_SCHEMA, " { ,,  , , }  ");
     }
 
     /**
@@ -280,15 +429,7 @@ public class ValuesTest {
      */
     @Test(expected = DataException.class)
     public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() {
-        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  \"1234567890\" ,, \"bar\" : \"0\",  \"baz\" : \"boz\" }  ");
-    }
-
-    /**
-     * Schema for Map requires a schema for key and value, but we have no key or value and Connect has no "any" type
-     */
-    @Test(expected = DataException.class)
-    public void shouldFailToParseStringOfEmptyMap() {
-        Values.convertToList(Schema.STRING_SCHEMA, " { }  ");
+        Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" :  \"1234567890\" ,, \"bar\" : \"0\",  \"baz\" : \"boz\" }  ");
     }
 
     @Test
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
index fdfdd32..58a17f5 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
@@ -164,11 +164,15 @@ public class SimpleHeaderConverterTest {
     }
 
     @Test
-    public void shouldConvertMapWithStringKeysAndMixedValuesToMapWithoutSchema() {
+    public void shouldConvertMapWithStringKeysAndMixedValuesToMap() {
         Map<String, Object> map = new LinkedHashMap<>();
         map.put("foo", "bar");
         map.put("baz", (short) 3456);
-        assertRoundTrip(null, map);
+        SchemaAndValue result = roundTrip(null, map);
+        assertEquals(Schema.Type.MAP, result.schema().type());
+        assertEquals(Schema.Type.STRING, result.schema().keySchema().type());
+        assertNull(result.schema().valueSchema());
+        assertEquals(map, result.value());
     }
 
     @Test
@@ -176,17 +180,29 @@ public class SimpleHeaderConverterTest {
         List<Object> list = new ArrayList<>();
         list.add("foo");
         list.add((short) 13344);
-        assertRoundTrip(null, list);
+        SchemaAndValue result = roundTrip(null, list);
+        assertEquals(Schema.Type.ARRAY, result.schema().type());
+        assertNull(result.schema().valueSchema());
+        assertEquals(list, result.value());
     }
 
     @Test
-    public void shouldConvertEmptyMapToMapWithoutSchema() {
-        assertRoundTrip(null, new LinkedHashMap<>());
+    public void shouldConvertEmptyMapToMap() {
+        Map<Object, Object> map = new LinkedHashMap<>();
+        SchemaAndValue result = roundTrip(null, map);
+        assertEquals(Schema.Type.MAP, result.schema().type());
+        assertNull(result.schema().keySchema());
+        assertNull(result.schema().valueSchema());
+        assertEquals(map, result.value());
     }
 
     @Test
-    public void shouldConvertEmptyListToListWithoutSchema() {
-        assertRoundTrip(null, new ArrayList<>());
+    public void shouldConvertEmptyListToList() {
+        List<Object> list = new ArrayList<>();
+        SchemaAndValue result = roundTrip(null, list);
+        assertEquals(Schema.Type.ARRAY, result.schema().type());
+        assertNull(result.schema().valueSchema());
+        assertEquals(list, result.value());
     }
 
     protected SchemaAndValue roundTrip(Schema schema, Object input) {