You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/01/21 20:17:17 UTC
[kafka] 02/02: KAFKA-9083: Various fixes/improvements for Connect's
Values class (#7593)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4d36a9e634170e968f37f650607802794eefc7ba
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Jan 21 13:33:50 2020 -0600
KAFKA-9083: Various fixes/improvements for Connect's Values class (#7593)
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../apache/kafka/connect/data/SchemaBuilder.java | 20 +++
.../java/org/apache/kafka/connect/data/Values.java | 108 +++++++++----
.../org/apache/kafka/connect/data/ValuesTest.java | 167 +++++++++++++++++++--
.../connect/storage/SimpleHeaderConverterTest.java | 30 +++-
4 files changed, 278 insertions(+), 47 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
index fdcf05a..722f5fc 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
@@ -382,6 +382,26 @@ public class SchemaBuilder implements Schema {
return builder;
}
+ static SchemaBuilder arrayOfNull() {
+ return new SchemaBuilder(Type.ARRAY);
+ }
+
+ static SchemaBuilder mapOfNull() {
+ return new SchemaBuilder(Type.MAP);
+ }
+
+ static SchemaBuilder mapWithNullKeys(Schema valueSchema) {
+ SchemaBuilder result = new SchemaBuilder(Type.MAP);
+ result.valueSchema = valueSchema;
+ return result;
+ }
+
+ static SchemaBuilder mapWithNullValues(Schema keySchema) {
+ SchemaBuilder result = new SchemaBuilder(Type.MAP);
+ result.keySchema = keySchema;
+ return result;
+ }
+
@Override
public Schema keySchema() {
return keySchema;
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index c1bebdf..93c320a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -723,14 +723,30 @@ public class Values {
return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN);
}
+ protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embedded, String tokenLiteral) {
+ int startPosition = parser.mark();
+ // If the next token is what we expect, then either...
+ if (parser.canConsume(tokenLiteral)) {
+ // ...we're reading an embedded value, in which case the next token will be handled appropriately
+ // by the caller if it's something like an end delimiter for a map or array, or a comma to
+ // separate multiple embedded values...
+ // ...or it's being parsed as part of a top-level string, in which case, any other tokens should
+ // cause use to stop parsing this single-token literal as such and instead just treat it like
+ // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and
+ // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true.
+ if (embedded || !parser.hasNext()) {
+ return true;
+ }
+ }
+ parser.rewindTo(startPosition);
+ return false;
+ }
+
protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
if (!parser.hasNext()) {
return null;
}
if (embedded) {
- if (parser.canConsume(NULL_VALUE)) {
- return null;
- }
if (parser.canConsume(QUOTE_DELIMITER)) {
StringBuilder sb = new StringBuilder();
while (parser.hasNext()) {
@@ -742,34 +758,51 @@ public class Values {
return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
}
}
- if (parser.canConsume(TRUE_LITERAL)) {
+
+ if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) {
+ return null;
+ }
+ if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) {
return TRUE_SCHEMA_AND_VALUE;
}
- if (parser.canConsume(FALSE_LITERAL)) {
+ if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) {
return FALSE_SCHEMA_AND_VALUE;
}
+
int startPosition = parser.mark();
+
try {
if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
List<Object> result = new ArrayList<>();
Schema elementSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(ARRAY_END_DELIMITER)) {
- Schema listSchema = null;
+ Schema listSchema;
if (elementSchema != null) {
listSchema = SchemaBuilder.array(elementSchema).schema();
+ result = alignListEntriesWithSchema(listSchema, result);
+ } else {
+ // Every value is null
+ listSchema = SchemaBuilder.arrayOfNull().build();
}
- result = alignListEntriesWithSchema(listSchema, result);
return new SchemaAndValue(listSchema, result);
}
+
if (parser.canConsume(COMMA_DELIMITER)) {
throw new DataException("Unable to parse an empty array element: " + parser.original());
}
SchemaAndValue element = parse(parser, true);
elementSchema = commonSchemaFor(elementSchema, element);
- result.add(element.value());
- parser.canConsume(COMMA_DELIMITER);
+ result.add(element != null ? element.value() : null);
+
+ int currentPosition = parser.mark();
+ if (parser.canConsume(ARRAY_END_DELIMITER)) {
+ parser.rewindTo(currentPosition);
+ } else if (!parser.canConsume(COMMA_DELIMITER)) {
+ throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter");
+ }
}
+
// Missing either a comma or an end delimiter
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Array is missing element after ',': " + parser.original());
@@ -783,26 +816,34 @@ public class Values {
Schema valueSchema = null;
while (parser.hasNext()) {
if (parser.canConsume(MAP_END_DELIMITER)) {
- Schema mapSchema = null;
+ Schema mapSchema;
if (keySchema != null && valueSchema != null) {
- mapSchema = SchemaBuilder.map(keySchema, valueSchema).schema();
+ mapSchema = SchemaBuilder.map(keySchema, valueSchema).build();
+ result = alignMapKeysAndValuesWithSchema(mapSchema, result);
+ } else if (keySchema != null) {
+ mapSchema = SchemaBuilder.mapWithNullValues(keySchema);
+ result = alignMapKeysWithSchema(mapSchema, result);
+ } else {
+ mapSchema = SchemaBuilder.mapOfNull().build();
}
- result = alignMapKeysAndValuesWithSchema(mapSchema, result);
return new SchemaAndValue(mapSchema, result);
}
+
if (parser.canConsume(COMMA_DELIMITER)) {
- throw new DataException("Unable to parse a map entry has no key or value: " + parser.original());
+ throw new DataException("Unable to parse a map entry with no key or value: " + parser.original());
}
SchemaAndValue key = parse(parser, true);
if (key == null || key.value() == null) {
throw new DataException("Map entry may not have a null key: " + parser.original());
}
+
if (!parser.canConsume(ENTRY_DELIMITER)) {
throw new DataException("Map entry is missing '=': " + parser.original());
}
SchemaAndValue value = parse(parser, true);
Object entryValue = value != null ? value.value() : null;
result.put(key.value(), entryValue);
+
parser.canConsume(COMMA_DELIMITER);
keySchema = commonSchemaFor(keySchema, key);
valueSchema = commonSchemaFor(valueSchema, value);
@@ -811,14 +852,19 @@ public class Values {
if (COMMA_DELIMITER.equals(parser.previous())) {
throw new DataException("Map is missing element after ',': " + parser.original());
}
- throw new DataException("Map is missing terminating ']': " + parser.original());
+ throw new DataException("Map is missing terminating '}': " + parser.original());
}
} catch (DataException e) {
- LOG.debug("Unable to parse the value as a map; reverting to string", e);
+ LOG.debug("Unable to parse the value as a map or an array; reverting to string", e);
parser.rewindTo(startPosition);
}
- String token = parser.next().trim();
- assert !token.isEmpty(); // original can be empty string but is handled right away; no way for token to be empty here
+
+ String token = parser.next();
+ if (token.trim().isEmpty()) {
+ return new SchemaAndValue(Schema.STRING_SCHEMA, token);
+ }
+ token = token.trim();
+
char firstChar = token.charAt(0);
boolean firstCharIsDigit = Character.isDigit(firstChar);
if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
@@ -878,6 +924,9 @@ public class Values {
}
}
}
+ if (embedded) {
+ throw new DataException("Failed to parse embedded value");
+ }
// At this point, the only thing this can be is a string. Embedded strings were processed above,
// so this is not embedded and we can use the original string...
return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
@@ -953,9 +1002,6 @@ public class Values {
}
protected static List<Object> alignListEntriesWithSchema(Schema schema, List<Object> input) {
- if (schema == null) {
- return input;
- }
Schema valueSchema = schema.valueSchema();
List<Object> result = new ArrayList<>();
for (Object value : input) {
@@ -966,9 +1012,6 @@ public class Values {
}
protected static Map<Object, Object> alignMapKeysAndValuesWithSchema(Schema mapSchema, Map<Object, Object> input) {
- if (mapSchema == null) {
- return input;
- }
Schema keySchema = mapSchema.keySchema();
Schema valueSchema = mapSchema.valueSchema();
Map<Object, Object> result = new LinkedHashMap<>();
@@ -980,6 +1023,16 @@ public class Values {
return result;
}
+ protected static Map<Object, Object> alignMapKeysWithSchema(Schema mapSchema, Map<Object, Object> input) {
+ Schema keySchema = mapSchema.keySchema();
+ Map<Object, Object> result = new LinkedHashMap<>();
+ for (Map.Entry<?, ?> entry : input.entrySet()) {
+ Object newKey = convertTo(keySchema, null, entry.getKey());
+ result.put(newKey, entry.getValue());
+ }
+ return result;
+ }
+
protected static class SchemaDetector {
private Type knownType = null;
private boolean optional = false;
@@ -1123,12 +1176,13 @@ public class Values {
nextToken = consumeNextToken();
}
if (ignoreLeadingAndTrailingWhitespace) {
- nextToken = nextToken.trim();
- while (nextToken.isEmpty() && canConsumeNextToken()) {
- nextToken = consumeNextToken().trim();
+ while (nextToken.trim().isEmpty() && canConsumeNextToken()) {
+ nextToken = consumeNextToken();
}
}
- return nextToken.equals(expected);
+ return ignoreLeadingAndTrailingWhitespace
+ ? nextToken.trim().equals(expected)
+ : nextToken.equals(expected);
}
}
}
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 162222d..a5909f3 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -35,6 +38,8 @@ import static org.junit.Assert.fail;
public class ValuesTest {
+ private static final String WHITESPACE = "\n \t \t\n";
+
private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
@@ -68,6 +73,149 @@ public class ValuesTest {
}
@Test
+ public void shouldNotParseUnquotedEmbeddedMapKeysAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("{foo: 3}");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("{foo: 3}", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseUnquotedEmbeddedMapValuesAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("{3: foo}");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("{3: foo}", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseUnquotedArrayElementsAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("[foo]");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("[foo]", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseStringsBeginningWithNullAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("null=");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("null=", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseStringsBeginningWithTrueAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("true}");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("true}", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseStringsBeginningWithFalseAsStrings() {
+ SchemaAndValue schemaAndValue = Values.parseString("false]");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("false]", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseTrueAsBooleanIfSurroundedByWhitespace() {
+ SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "true" + WHITESPACE);
+ assertEquals(Type.BOOLEAN, schemaAndValue.schema().type());
+ assertEquals(true, schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseFalseAsBooleanIfSurroundedByWhitespace() {
+ SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "false" + WHITESPACE);
+ assertEquals(Type.BOOLEAN, schemaAndValue.schema().type());
+ assertEquals(false, schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseNullAsNullIfSurroundedByWhitespace() {
+ SchemaAndValue schemaAndValue = Values.parseString(WHITESPACE + "null" + WHITESPACE);
+ assertNull(schemaAndValue);
+ }
+
+ @Test
+ public void shouldParseBooleanLiteralsEmbeddedInArray() {
+ SchemaAndValue schemaAndValue = Values.parseString("[true, false]");
+ assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+ assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type());
+ assertEquals(Arrays.asList(true, false), schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseBooleanLiteralsEmbeddedInMap() {
+ SchemaAndValue schemaAndValue = Values.parseString("{true: false, false: true}");
+ assertEquals(Type.MAP, schemaAndValue.schema().type());
+ assertEquals(Type.BOOLEAN, schemaAndValue.schema().keySchema().type());
+ assertEquals(Type.BOOLEAN, schemaAndValue.schema().valueSchema().type());
+ Map<Boolean, Boolean> expectedValue = new HashMap<>();
+ expectedValue.put(true, false);
+ expectedValue.put(false, true);
+ assertEquals(expectedValue, schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseAsMapWithoutCommas() {
+ SchemaAndValue schemaAndValue = Values.parseString("{6:9 4:20}");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("{6:9 4:20}", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseAsArrayWithoutCommas() {
+ SchemaAndValue schemaAndValue = Values.parseString("[0 1 2]");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("[0 1 2]", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseEmptyMap() {
+ SchemaAndValue schemaAndValue = Values.parseString("{}");
+ assertEquals(Type.MAP, schemaAndValue.schema().type());
+ assertEquals(Collections.emptyMap(), schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseEmptyArray() {
+ SchemaAndValue schemaAndValue = Values.parseString("[]");
+ assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+ assertEquals(Collections.emptyList(), schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldNotParseAsMapWithNullKeys() {
+ SchemaAndValue schemaAndValue = Values.parseString("{null: 3}");
+ assertEquals(Type.STRING, schemaAndValue.schema().type());
+ assertEquals("{null: 3}", schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseNull() {
+ SchemaAndValue schemaAndValue = Values.parseString("null");
+ assertNull(schemaAndValue);
+ }
+
+ @Test
+ public void shouldConvertStringOfNull() {
+ assertRoundTrip(Schema.STRING_SCHEMA, "null");
+ }
+
+ @Test
+ public void shouldParseNullMapValues() {
+ SchemaAndValue schemaAndValue = Values.parseString("{3: null}");
+ assertEquals(Type.MAP, schemaAndValue.schema().type());
+ assertEquals(Type.INT8, schemaAndValue.schema().keySchema().type());
+ assertEquals(Collections.singletonMap((byte) 3, null), schemaAndValue.value());
+ }
+
+ @Test
+ public void shouldParseNullArrayElements() {
+ SchemaAndValue schemaAndValue = Values.parseString("[null]");
+ assertEquals(Type.ARRAY, schemaAndValue.schema().type());
+ assertEquals(Collections.singletonList(null), schemaAndValue.value());
+ }
+
+ @Test
public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() {
String original = "three\"blind\\\"mice";
String expected = "three\\\"blind\\\\\\\"mice";
@@ -214,7 +362,8 @@ public class ValuesTest {
public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() {
String str = "[1, 2, 3, \"four\"]";
SchemaAndValue result = Values.parseString(str);
- assertNull(result.schema());
+ assertEquals(Type.ARRAY, result.schema().type());
+ assertNull(result.schema().valueSchema());
List<?> list = (List<?>) result.value();
assertEquals(4, list.size());
assertEquals(1, ((Number) list.get(0)).intValue());
@@ -256,7 +405,7 @@ public class ValuesTest {
*/
@Test(expected = DataException.class)
public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() {
- Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 ,, \"bar\" : 0, \"baz\" : -987654321 } ");
+ Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 ,, \"bar\" : 0, \"baz\" : -987654321 } ");
}
/**
@@ -264,7 +413,7 @@ public class ValuesTest {
*/
@Test(expected = DataException.class)
public void shouldFailToParseStringOfMalformedMap() {
- Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"a\", \"bar\" : 0, \"baz\" : -987654321 } ");
+ Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"a\", \"bar\" : 0, \"baz\" : -987654321 } ");
}
/**
@@ -272,7 +421,7 @@ public class ValuesTest {
*/
@Test(expected = DataException.class)
public void shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() {
- Values.convertToList(Schema.STRING_SCHEMA, " { ,, , , } ");
+ Values.convertToMap(Schema.STRING_SCHEMA, " { ,, , , } ");
}
/**
@@ -280,15 +429,7 @@ public class ValuesTest {
*/
@Test(expected = DataException.class)
public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() {
- Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : \"1234567890\" ,, \"bar\" : \"0\", \"baz\" : \"boz\" } ");
- }
-
- /**
- * Schema for Map requires a schema for key and value, but we have no key or value and Connect has no "any" type
- */
- @Test(expected = DataException.class)
- public void shouldFailToParseStringOfEmptyMap() {
- Values.convertToList(Schema.STRING_SCHEMA, " { } ");
+ Values.convertToMap(Schema.STRING_SCHEMA, " { \"foo\" : \"1234567890\" ,, \"bar\" : \"0\", \"baz\" : \"boz\" } ");
}
@Test
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
index fdfdd32..58a17f5 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
@@ -164,11 +164,15 @@ public class SimpleHeaderConverterTest {
}
@Test
- public void shouldConvertMapWithStringKeysAndMixedValuesToMapWithoutSchema() {
+ public void shouldConvertMapWithStringKeysAndMixedValuesToMap() {
Map<String, Object> map = new LinkedHashMap<>();
map.put("foo", "bar");
map.put("baz", (short) 3456);
- assertRoundTrip(null, map);
+ SchemaAndValue result = roundTrip(null, map);
+ assertEquals(Schema.Type.MAP, result.schema().type());
+ assertEquals(Schema.Type.STRING, result.schema().keySchema().type());
+ assertNull(result.schema().valueSchema());
+ assertEquals(map, result.value());
}
@Test
@@ -176,17 +180,29 @@ public class SimpleHeaderConverterTest {
List<Object> list = new ArrayList<>();
list.add("foo");
list.add((short) 13344);
- assertRoundTrip(null, list);
+ SchemaAndValue result = roundTrip(null, list);
+ assertEquals(Schema.Type.ARRAY, result.schema().type());
+ assertNull(result.schema().valueSchema());
+ assertEquals(list, result.value());
}
@Test
- public void shouldConvertEmptyMapToMapWithoutSchema() {
- assertRoundTrip(null, new LinkedHashMap<>());
+ public void shouldConvertEmptyMapToMap() {
+ Map<Object, Object> map = new LinkedHashMap<>();
+ SchemaAndValue result = roundTrip(null, map);
+ assertEquals(Schema.Type.MAP, result.schema().type());
+ assertNull(result.schema().keySchema());
+ assertNull(result.schema().valueSchema());
+ assertEquals(map, result.value());
}
@Test
- public void shouldConvertEmptyListToListWithoutSchema() {
- assertRoundTrip(null, new ArrayList<>());
+ public void shouldConvertEmptyListToList() {
+ List<Object> list = new ArrayList<>();
+ SchemaAndValue result = roundTrip(null, list);
+ assertEquals(Schema.Type.ARRAY, result.schema().type());
+ assertNull(result.schema().valueSchema());
+ assertEquals(list, result.value());
}
protected SchemaAndValue roundTrip(Schema schema, Object input) {