You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 05:27:00 UTC

[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string

    [ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633619#comment-16633619 ] 

ASF GitHub Bot commented on KAFKA-6684:
---------------------------------------

ewencp closed pull request #4820: KAFKA-6684: Cast transform bytes
URL: https://github.com/apache/kafka/pull/4820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index ceb176858d7..c2bd9f480f5 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -70,7 +70,7 @@
     private static final String FALSE_LITERAL = Boolean.TRUE.toString();
     private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
     private static final String NULL_VALUE = "null";
-    private static final String ISO_8601_DATE_FORMAT_PATTERN = "YYYY-MM-DD";
+    private static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
     private static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
     private static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
 
@@ -713,7 +713,7 @@ protected static String escape(String value) {
         return DOUBLEQOUTE.matcher(replace1).replaceAll("\\\\\"");
     }
 
-    protected static DateFormat dateFormatFor(java.util.Date value) {
+    public static DateFormat dateFormatFor(java.util.Date value) {
         if (value.getTime() < MILLIS_PER_DAY) {
             return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN);
         }
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index a593c7b3934..07ccd37892f 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -78,9 +79,16 @@ public String toString() {
 
     private static final String PURPOSE = "cast types";
 
-    private static final Set<Schema.Type> SUPPORTED_CAST_TYPES = EnumSet.of(
+    private static final Set<Schema.Type> SUPPORTED_CAST_INPUT_TYPES = EnumSet.of(
             Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
-                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, Schema.Type.STRING
+                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
+                            Schema.Type.STRING, Schema.Type.BYTES
+    );
+
+    private static final Set<Schema.Type> SUPPORTED_CAST_OUTPUT_TYPES = EnumSet.of(
+            Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
+                    Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
+                            Schema.Type.STRING
     );
 
     // As a special case for casting the entire value (e.g. the incoming key is a int64 but you know it could be an
@@ -120,14 +128,14 @@ public void close() {
 
     private R applySchemaless(R record) {
         if (wholeValueCastType != null) {
-            return newRecord(record, null, castValueToType(operatingValue(record), wholeValueCastType));
+            return newRecord(record, null, castValueToType(null, operatingValue(record), wholeValueCastType));
         }
 
         final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
         final HashMap<String, Object> updatedValue = new HashMap<>(value);
         for (Map.Entry<String, Schema.Type> fieldSpec : casts.entrySet()) {
             String field = fieldSpec.getKey();
-            updatedValue.put(field, castValueToType(value.get(field), fieldSpec.getValue()));
+            updatedValue.put(field, castValueToType(null, value.get(field), fieldSpec.getValue()));
         }
         return newRecord(record, null, updatedValue);
     }
@@ -138,7 +146,7 @@ private R applyWithSchema(R record) {
 
         // Whole-record casting
         if (wholeValueCastType != null)
-            return newRecord(record, updatedSchema, castValueToType(operatingValue(record), wholeValueCastType));
+            return newRecord(record, updatedSchema, castValueToType(valueSchema, operatingValue(record), wholeValueCastType));
 
         // Casting within a struct
         final Struct value = requireStruct(operatingValue(record), PURPOSE);
@@ -147,7 +155,7 @@ private R applyWithSchema(R record) {
         for (Field field : value.schema().fields()) {
             final Object origFieldValue = value.get(field);
             final Schema.Type targetType = casts.get(field.name());
-            final Object newFieldValue = targetType != null ? castValueToType(origFieldValue, targetType) : origFieldValue;
+            final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
             updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
         }
         return newRecord(record, updatedSchema, updatedValue);
@@ -168,8 +176,10 @@ private Schema getOrBuildSchema(Schema valueSchema) {
                     SchemaBuilder fieldBuilder = convertFieldType(casts.get(field.name()));
                     if (field.schema().isOptional())
                         fieldBuilder.optional();
-                    if (field.schema().defaultValue() != null)
-                        fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type()));
+                    if (field.schema().defaultValue() != null) {
+                        Schema fieldSchema = field.schema();
+                        fieldBuilder.defaultValue(castValueToType(fieldSchema, fieldSchema.defaultValue(), fieldBuilder.type()));
+                    }
                     builder.field(field.name(), fieldBuilder.build());
                 } else {
                     builder.field(field.name(), field.schema());
@@ -180,7 +190,7 @@ private Schema getOrBuildSchema(Schema valueSchema) {
         if (valueSchema.isOptional())
             builder.optional();
         if (valueSchema.defaultValue() != null)
-            builder.defaultValue(castValueToType(valueSchema.defaultValue(), builder.type()));
+            builder.defaultValue(castValueToType(valueSchema, valueSchema.defaultValue(), builder.type()));
 
         updatedSchema = builder.build();
         schemaUpdateCache.put(valueSchema, updatedSchema);
@@ -211,11 +221,12 @@ private SchemaBuilder convertFieldType(Schema.Type type) {
 
     }
 
-    private static Object castValueToType(Object value, Schema.Type targetType) {
+    private static Object castValueToType(Schema schema, Object value, Schema.Type targetType) {
         try {
             if (value == null) return null;
 
-            Schema.Type inferredType = ConnectSchema.schemaType(value.getClass());
+            Schema.Type inferredType = schema == null ? ConnectSchema.schemaType(value.getClass()) :
+                    schema.type();
             if (inferredType == null) {
                 throw new DataException("Cast transformation was passed a value of type " + value.getClass()
                         + " which is not supported by Connect's data API");
@@ -326,7 +337,12 @@ else if (value instanceof String)
     }
 
     private static String castToString(Object value) {
-        return value.toString();
+        if (value instanceof java.util.Date) {
+            java.util.Date dateValue = (java.util.Date) value;
+            return Values.dateFormatFor(dateValue).format(dateValue);
+        } else {
+            return value.toString();
+        }
     }
 
     protected abstract Schema operatingSchema(R record);
@@ -369,15 +385,19 @@ private static String castToString(Object value) {
     }
 
     private static Schema.Type validCastType(Schema.Type type, FieldType fieldType) {
-        if (!SUPPORTED_CAST_TYPES.contains(type)) {
-            String message = "Cast transformation does not support casting to/from " + type
-                    + "; supported types are " + SUPPORTED_CAST_TYPES;
-            switch (fieldType) {
-                case INPUT:
-                    throw new DataException(message);
-                case OUTPUT:
-                    throw new ConfigException(message);
-            }
+        switch (fieldType) {
+            case INPUT:
+                if (!SUPPORTED_CAST_INPUT_TYPES.contains(type)) {
+                    throw new DataException("Cast transformation does not support casting from " +
+                        type + "; supported types are " + SUPPORTED_CAST_INPUT_TYPES);
+                }
+                break;
+            case OUTPUT:
+                if (!SUPPORTED_CAST_OUTPUT_TYPES.contains(type)) {
+                    throw new ConfigException("Cast transformation does not support casting to " +
+                        type + "; supported types are " + SUPPORTED_CAST_OUTPUT_TYPES);
+                }
+                break;
         }
         return type;
     }
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index 06fbe311c16..c568afb9ff4 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -18,15 +18,18 @@
 package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.After;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -39,6 +42,7 @@
 public class CastTest {
     private final Cast<SourceRecord> xformKey = new Cast.Key<>();
     private final Cast<SourceRecord> xformValue = new Cast.Value<>();
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
 
     @After
     public void teardown() {
@@ -61,6 +65,11 @@ public void testConfigInvalidTargetType() {
         xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
     }
 
+    @Test(expected = ConfigException.class)
+    public void testUnsupportedTargetType() {
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes"));
+    }
+
     @Test(expected = ConfigException.class)
     public void testConfigInvalidMap() {
         xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
@@ -171,6 +180,28 @@ public void castWholeRecordValueWithSchemaString() {
         assertEquals("42", transformed.value());
     }
 
+    @Test
+    public void castWholeBigDecimalRecordValueWithSchemaString() {
+        BigDecimal bigDecimal = new BigDecimal(42);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
+                Decimal.schema(bigDecimal.scale()), bigDecimal));
+
+        assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
+        assertEquals("42", transformed.value());
+    }
+
+    @Test
+    public void castWholeDateRecordValueWithSchemaString() {
+        Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a timestamp formatting.
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
+                Timestamp.SCHEMA, timestamp));
+
+        assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
+        assertEquals(Values.dateFormatFor(timestamp).format(timestamp), transformed.value());
+    }
+
     @Test
     public void castWholeRecordDefaultValue() {
         // Validate default value in schema is correctly converted
@@ -292,7 +323,8 @@ public void castWholeRecordValueSchemalessUnsupportedType() {
 
     @Test
     public void castFieldsWithSchema() {
-        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
+        Date day = new Date(MILLIS_PER_DAY);
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32"));
 
         // Include an optional fields and fields with defaults to validate their values are passed through properly
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -305,6 +337,8 @@ public void castFieldsWithSchema() {
         builder.field("float64", SchemaBuilder.float64().defaultValue(-1.125).build());
         builder.field("boolean", Schema.BOOLEAN_SCHEMA);
         builder.field("string", Schema.STRING_SCHEMA);
+        builder.field("bigdecimal", Decimal.schema(new BigDecimal(42).scale()));
+        builder.field("date", Timestamp.SCHEMA);
         builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
         builder.field("timestamp", Timestamp.SCHEMA);
         Schema supportedTypesSchema = builder.build();
@@ -317,6 +351,8 @@ public void castFieldsWithSchema() {
         recordValue.put("float32", 32.f);
         recordValue.put("float64", -64.);
         recordValue.put("boolean", true);
+        recordValue.put("bigdecimal", new BigDecimal(42));
+        recordValue.put("date", day);
         recordValue.put("string", "42");
         recordValue.put("timestamp", new Date(0));
         // optional field intentionally omitted
@@ -335,6 +371,8 @@ public void castFieldsWithSchema() {
         assertEquals(true, ((Struct) transformed.value()).schema().field("float64").schema().defaultValue());
         assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean"));
         assertEquals(42, ((Struct) transformed.value()).get("string"));
+        assertEquals("42", ((Struct) transformed.value()).get("bigdecimal"));
+        assertEquals(Values.dateFormatFor(day).format(day), ((Struct) transformed.value()).get("date"));
         assertEquals(new Date(0), ((Struct) transformed.value()).get("timestamp"));
         assertNull(((Struct) transformed.value()).get("optional"));
 
@@ -347,6 +385,8 @@ public void castFieldsWithSchema() {
         assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("float64").schema().type());
         assertEquals(Schema.INT8_SCHEMA.type(), transformedSchema.field("boolean").schema().type());
         assertEquals(Schema.INT32_SCHEMA.type(), transformedSchema.field("string").schema().type());
+        assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("bigdecimal").schema().type());
+        assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("date").schema().type());
         assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("optional").schema().type());
         // The following fields are not changed
         assertEquals(Timestamp.SCHEMA.type(), transformedSchema.field("timestamp").schema().type());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support casting values with bytes schema to string 
> ---------------------------------------------------
>
>                 Key: KAFKA-6684
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6684
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Amit Sela
>            Priority: Critical
>             Fix For: 2.1.0
>
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is not supported.
> This proposes to allow casting anything to a string, kind of like Java's {{toString()}}, such that if the object is actually a LogicalType it can be "serialized" as string instead of bytes+schema.
>  
> {noformat}
> Examples:
> BigDecimal will cast to the string representation of the number.
> Timestamp will cast to the string representation of the timestamp, or maybe UTC yyyymmddTHH:MM:SS.f format?
> {noformat}
>  
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)