You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/09/22 13:37:36 UTC

[pulsar] branch master updated: Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly (#11905)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebad71a  Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly (#11905)
ebad71a is described below

commit ebad71aed7991e4494f81cf1980408f5bbff980a
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Wed Sep 22 06:36:11 2021 -0700

    Fixed KCA Sink handling of Json and Avro; support for kafka connectors that overload task.preCommit() directly (#11905)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  16 ++-
 .../io/kafka/connect/schema/KafkaConnectData.java  | 160 +++++++++++++++++++++
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  98 ++++++++++---
 .../kafka/connect/SchemaedFileStreamSinkTask.java  |  16 ++-
 4 files changed, 268 insertions(+), 22 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 8b60f54..268105c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
 import java.util.List;
@@ -199,9 +200,14 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         final Record<GenericObject> lastNotFlushed = pendingFlushQueue.getLast();
         try {
             Map<TopicPartition, OffsetAndMetadata> currentOffsets = taskContext.currentOffsets();
-            task.flush(currentOffsets);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = task.preCommit(currentOffsets);
+            if (committedOffsets.isEmpty()) {
+                log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
+                return;
+            }
             taskContext.flushOffsets(currentOffsets);
             ackUntil(lastNotFlushed, Record::ack);
+            log.info("Flush succeeded");
         } catch (Throwable t) {
             log.error("error flushing pending records", t);
             ackUntil(lastNotFlushed, Record::fail);
@@ -222,7 +228,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     }
 
     @SuppressWarnings("rawtypes")
-    private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
+    protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
         final String topic = sourceRecord.getTopicName().orElse(topicName);
         final Object key;
@@ -237,10 +243,10 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
             KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
-            key = kv.getKey();
-            value = kv.getValue();
             keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
             valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
+            key = kv.getKey();
+            value = kv.getValue();
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
@@ -250,7 +256,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 keySchema = Schema.STRING_SCHEMA;
             }
             valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
-            value = sourceRecord.getValue().getNativeObject();
+            value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
         }
 
         long offset = sourceRecord.getRecordSequence()
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
new file mode 100644
index 0000000..b649a9b
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import org.apache.avro.generic.GenericData;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConnectData {
+    public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
+        if (kafkaSchema == null) {
+            return nativeObject;
+        }
+
+        if (nativeObject == null) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
+        if (nativeObject instanceof JsonNode) {
+            JsonNode node = (JsonNode) nativeObject;
+            return jsonAsConnectData(node, kafkaSchema);
+        } else if (nativeObject instanceof GenericData.Record) {
+            GenericData.Record avroRecord = (GenericData.Record) nativeObject;
+            return avroAsConnectData(avroRecord, kafkaSchema);
+        }
+
+        return nativeObject;
+    }
+
+    static Object avroAsConnectData(GenericData.Record avroRecord, Schema kafkaSchema) {
+        if (kafkaSchema == null) {
+            if (avroRecord == null) {
+                return null;
+            }
+            throw new DataException("Don't know how to convert " + avroRecord + " to Connect data (schema is null).");
+        }
+
+        Struct struct = new Struct(kafkaSchema);
+        for (Field field : kafkaSchema.fields()) {
+            struct.put(field, getKafkaConnectData(avroRecord.get(field.name()), field.schema()));
+        }
+        return struct;
+    }
+
+    // with some help of
+    // https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+    static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
+        if (kafkaSchema == null) {
+            if (jsonNode == null || jsonNode.isNull()) {
+                return null;
+            }
+            switch (jsonNode.getNodeType()) {
+                case BINARY:
+                    try {
+                        return jsonNode.binaryValue();
+                    } catch (IOException e) {
+                        throw new DataException("Cannot get binary value for " + jsonNode);
+                    }
+                case BOOLEAN:
+                    return jsonNode.booleanValue();
+                case NUMBER:
+                    jsonNode.doubleValue();
+                case STRING:
+                    jsonNode.textValue();
+                default:
+                    throw new DataException("Don't know how to convert " + jsonNode +
+                            " to Connect data (schema is null).");
+            }
+        }
+
+        if (jsonNode == null || jsonNode.isNull()) {
+            return defaultOrThrow(kafkaSchema);
+        }
+
+        switch (kafkaSchema.type()) {
+            case INT8:
+                return (byte)jsonNode.shortValue();
+            case INT16:
+                return jsonNode.shortValue();
+            case INT32:
+                return jsonNode.intValue();
+            case INT64:
+                return  jsonNode.longValue();
+            case FLOAT32:
+                return jsonNode.floatValue();
+            case FLOAT64:
+                return jsonNode.doubleValue();
+            case BOOLEAN:
+                return jsonNode.booleanValue();
+            case STRING:
+                return jsonNode.textValue();
+            case BYTES:
+                try {
+                    return jsonNode.binaryValue();
+                } catch (IOException e) {
+                    throw new DataException("Cannot get binary value for " + jsonNode + " with schema " + kafkaSchema);
+                }
+            case ARRAY:
+                List<Object> list = new ArrayList<>();
+                Preconditions.checkArgument(jsonNode.isArray(), "jsonNode has to be an array");
+                for (Iterator<JsonNode> it = jsonNode.elements(); it.hasNext(); ) {
+                    list.add(jsonAsConnectData(it.next(), kafkaSchema.valueSchema()));
+                }
+                return list;
+            case MAP:
+                Map<String, Object> map = new HashMap<>();
+                for (Iterator<Map.Entry<String, JsonNode>> it = jsonNode.fields(); it.hasNext(); ) {
+                    Map.Entry<String, JsonNode> elem = it.next();
+                    map.put(elem.getKey(), jsonAsConnectData(elem.getValue(), kafkaSchema.valueSchema()));
+                }
+                return map;
+            case STRUCT:
+                Struct struct = new Struct(kafkaSchema);
+                for (Field field: kafkaSchema.fields()) {
+                    struct.put(field, jsonAsConnectData(jsonNode.get(field.name()), field.schema()));
+                }
+                return struct;
+            default:
+                throw new DataException("Unknown schema type " + kafkaSchema.type());
+        }
+    }
+
+    private static Object defaultOrThrow(Schema kafkaSchema) {
+        if (kafkaSchema.defaultValue() != null) {
+            return kafkaSchema.defaultValue(); // any logical type conversions should already have been applied
+        }
+        if (kafkaSchema.isOptional()) {
+            return null;
+        }
+        throw new DataException("Invalid null value for required " + kafkaSchema.type() + " field");
+    }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 92d2a25..b561e4b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -20,9 +20,13 @@
 package org.apache.pulsar.io.kafka.connect;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -34,13 +38,17 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -124,7 +132,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         final GenericRecord rec = getGenericRecord("value", Schema.STRING);
         Message msg = mock(MessageImpl.class);
         when(msg.getValue()).thenReturn(rec);
-        when(msg.getMessageId()).thenReturn(new MessageIdImpl(0, 0, 0));
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
 
         final AtomicInteger status = new AtomicInteger(0);
         Record<GenericObject> record = PulsarRecord.<String>builder()
@@ -230,11 +238,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     }
 
-    private void recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
-        recordSchemaTest(value, schema, "key",  "STRING", expected, expectedSchema);
+    private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected, String expectedSchema) throws Exception {
+        return recordSchemaTest(value, schema, "key",  "STRING", expected, expectedSchema);
     }
 
-    private void recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
+    private SinkRecord recordSchemaTest(Object value, Schema schema, Object expectedKey, String expectedKeySchema,
                                   Object expected, String expectedSchema) throws Exception {
         props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
 
@@ -246,7 +254,7 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         when(msg.getValue()).thenReturn(rec);
         when(msg.getKey()).thenReturn("key");
         when(msg.hasKey()).thenReturn(true);
-        when(msg.getMessageId()).thenReturn(new MessageIdImpl(0, 0, 0));
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
 
         final AtomicInteger status = new AtomicInteger(0);
         Record<GenericObject> record = PulsarRecord.<String>builder()
@@ -272,6 +280,9 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         assertEquals(result.get("value"), expected);
         assertEquals(result.get("keySchema"), expectedKeySchema);
         assertEquals(result.get("valueSchema"), expectedSchema);
+
+        SinkRecord sinkRecord = sink.toSinkRecord(record);
+        return sinkRecord;
     }
 
     private GenericRecord getGenericRecord(Object value, Schema schema) {
@@ -289,50 +300,69 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     @Test
     public void bytesRecordSchemaTest() throws Exception {
-        recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), Schema.BYTES, "val", "BYTES");
+        byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
+        SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
+        byte[] out = (byte[]) sinkRecord.value();
+        Assert.assertEquals(out, in);
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
-        recordSchemaTest("val", Schema.STRING, "val", "STRING");
+        SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
+        String out = (String) sinkRecord.value();
+        Assert.assertEquals(out, "val");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
-        recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
+        SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
+        boolean out = (boolean) sinkRecord.value();
+        Assert.assertEquals(out, true);
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+        SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
+        byte out = (byte) sinkRecord.value();
+        Assert.assertEquals(out, 1);
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+        SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
+        short out = (short) sinkRecord.value();
+        Assert.assertEquals(out, 1);
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
-        recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
+        SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
+        int out = (int) sinkRecord.value();
+        Assert.assertEquals(out, Integer.MAX_VALUE);
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
-        recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
+        SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
+        long out = (long) sinkRecord.value();
+        Assert.assertEquals(out, Long.MAX_VALUE);
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
         // 1.0d is coming back from ObjectMapper
-        recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
+        SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
+        float out = (float) sinkRecord.value();
+        Assert.assertEquals(out, 1.0d);
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
-        recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
+        SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
+        double out = (double) sinkRecord.value();
+        Assert.assertEquals(out, Double.MAX_VALUE);
     }
 
     @Test
@@ -347,13 +377,45 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         obj.setField2("test");
         obj.setField3(100L);
 
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNode = mapper.valueToTree(obj);
+
         Map<String, Object> expected = new LinkedHashMap<>();
         expected.put("field1", 10);
         expected.put("field2", "test");
         // integer is coming back from ObjectMapper
         expected.put("field3", 100);
 
-        recordSchemaTest(obj, jsonSchema, expected, "STRUCT");
+        SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");
+
+        Struct out = (Struct) sinkRecord.value();
+        Assert.assertEquals((int)out.get("field1"), 10);
+        Assert.assertEquals((String)out.get("field2"), "test");
+        Assert.assertEquals((long)out.get("field3"), 100L);
+    }
+
+    @Test
+    public void avroSchemaTest() throws Exception {
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        obj.put("field1", 10);
+        obj.put("field2", "test");
+        obj.put("field3", 100L);
+
+        Map<String, Object> expected = new LinkedHashMap<>();
+        expected.put("field1", 10);
+        expected.put("field2", "test");
+        // integer is coming back from ObjectMapper
+        expected.put("field3", 100);
+
+        SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");
+
+        Struct out = (Struct) sinkRecord.value();
+        Assert.assertEquals((int)out.get("field1"), 10);
+        Assert.assertEquals((String)out.get("field2"), "test");
+        Assert.assertEquals((long)out.get("field3"), 100L);
     }
 
     @Test
@@ -390,7 +452,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
     @Test
     public void KeyValueSchemaTest() throws Exception {
         KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
-        recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+        SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
+        String val = (String) sinkRecord.value();
+        Assert.assertEquals(val, "value");
+        int key = (int) sinkRecord.key();
+        Assert.assertEquals(key, 11);
     }
 
     @Test
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
index 91c0790..9821a58 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkTask.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.file.FileStreamSinkTask;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.testng.collections.Maps;
@@ -51,7 +53,19 @@ public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {
             recOut.put("keySchema", record.keySchema().type().toString());
             recOut.put("valueSchema", record.valueSchema().type().toString());
             recOut.put("key", record.key());
-            recOut.put("value", val);
+            if (val instanceof Struct) {
+                Map<String, Object> map = Maps.newHashMap();
+                Struct struct = (Struct)val;
+
+                // no recursion needed for tests
+                for (Field f: struct.schema().fields()) {
+                    map.put(f.name(), struct.get(f));
+                }
+
+                recOut.put("value", map);
+            } else {
+                recOut.put("value", val);
+            }
 
             ObjectMapper om = new ObjectMapper();
             try {