You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:42 UTC

[23/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
new file mode 100644
index 0000000..0b1760b
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.errors.SchemaProjectorException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SchemaProjectorTest {
+
+    @Test
+    public void testPrimitiveTypeProjection() throws Exception {
+        Object projected;
+        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+        assertEquals(false, projected);
+
+        byte[] bytes = {(byte) 1, (byte) 2};
+        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+        assertEquals(bytes, projected);
+
+        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+        assertEquals("abc", projected);
+
+        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA);
+        assertEquals(false, projected);
+
+        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA);
+        assertEquals(bytes, projected);
+
+        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA);
+        assertEquals("abc", projected);
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+
+        try {
+            SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+            fail("Cannot project optional schema to schema with no default value.");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testNumericTypeProjection() throws Exception {
+        Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA};
+        Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA,
+                                              Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA};
+
+        Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345};
+        Map<Object, List<?>> expectedProjected = new HashMap<>();
+        expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L, 127.F, 127.));
+        expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.));
+        expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
+        expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
+        expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
+        expectedProjected.put(values[5], Arrays.asList(1.2345));
+
+        Object promoted;
+        for (int i = 0; i < promotableSchemas.length; ++i) {
+            Schema source = promotableSchemas[i];
+            List<?> expected = expectedProjected.get(values[i]);
+            for (int j = i; j < promotableSchemas.length; ++j) {
+                Schema target = promotableSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
+                Schema target = promotableOptionalSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+        }
+
+        for (int i = 0; i < promotableOptionalSchemas.length; ++i) {
+            Schema source = promotableSchemas[i];
+            List<?> expected = expectedProjected.get(values[i]);
+            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
+                Schema target = promotableOptionalSchemas[j];
+                promoted = SchemaProjector.project(source, values[i], target);
+                if (target.type() == Type.FLOAT64) {
+                    assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+                } else {
+                    assertEquals(expected.get(j - i), promoted);
+                }
+            }
+        }
+
+        Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA};
+        for (Schema promotableSchema: promotableSchemas) {
+            for (Schema nonPromotableSchema: nonPromotableSchemas) {
+                Object dummy = new Object();
+                try {
+                    SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema);
+                    fail("Cannot promote " +  promotableSchema.type() + " to " + nonPromotableSchema.type());
+                } catch (DataException e) {
+                    // expected
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testPrimitiveOptionalProjection() throws Exception {
+        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, true);
+        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, false);
+
+        byte[] bytes = {(byte) 1, (byte) 2};
+        byte[] defaultBytes = {(byte) 3, (byte) 4};
+        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, true);
+        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", true);
+        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., true);
+        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, false);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, false);
+
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, true);
+        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, false);
+    }
+
+    @Test
+    public void testStructAddField() throws Exception {
+        Schema source = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Struct sourceStruct = new Struct(source);
+        sourceStruct.put("field", 1);
+
+        Schema target = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+                .build();
+
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+
+        assertEquals(1, (int) targetStruct.getInt32("field"));
+        assertEquals(123, (int) targetStruct.getInt32("field2"));
+
+        Schema incompatibleTargetSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+
+        try {
+            SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema);
+            fail("Incompatible schema.");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testStructRemoveField() throws Exception {
+        Schema source = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+        Struct sourceStruct = new Struct(source);
+        sourceStruct.put("field", 1);
+        sourceStruct.put("field2", 234);
+
+        Schema target = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+        assertEquals(1, targetStruct.get("field"));
+        try {
+            targetStruct.get("field2");
+            fail("field2 is not part of the projected struct");
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testStructDefaultValue() throws Exception {
+        Schema source = SchemaBuilder.struct().optional()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA)
+                .build();
+
+        SchemaBuilder builder = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", Schema.INT32_SCHEMA);
+
+        Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345);
+        builder.defaultValue(defaultStruct);
+        Schema target = builder.build();
+
+        Object projected = SchemaProjector.project(source, null, target);
+        assertEquals(defaultStruct, projected);
+
+        Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678);
+        Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+        assertEquals(sourceStruct.get("field"), targetStruct.get("field"));
+        assertEquals(sourceStruct.get("field2"), targetStruct.get("field2"));
+    }
+
+    @Test
+    public void testNestedSchemaProjection() throws Exception {
+        Schema sourceFlatSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .build();
+        Schema targetFlatSchema = SchemaBuilder.struct()
+                .field("field", Schema.INT32_SCHEMA)
+                .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+                .build();
+        Schema sourceNestedSchema = SchemaBuilder.struct()
+                .field("first", Schema.INT32_SCHEMA)
+                .field("second", Schema.STRING_SCHEMA)
+                .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+                .field("nested", sourceFlatSchema)
+                .build();
+        Schema targetNestedSchema = SchemaBuilder.struct()
+                .field("first", Schema.INT32_SCHEMA)
+                .field("second", Schema.STRING_SCHEMA)
+                .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+                .field("nested", targetFlatSchema)
+                .build();
+
+        Struct sourceFlatStruct = new Struct(sourceFlatSchema);
+        sourceFlatStruct.put("field", 113);
+
+        Struct sourceNestedStruct = new Struct(sourceNestedSchema);
+        sourceNestedStruct.put("first", 1);
+        sourceNestedStruct.put("second", "abc");
+        sourceNestedStruct.put("array", Arrays.asList(1, 2));
+        sourceNestedStruct.put("map", Collections.singletonMap(5, "def"));
+        sourceNestedStruct.put("nested", sourceFlatStruct);
+
+        Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema, sourceNestedStruct,
+                                                                     targetNestedSchema);
+        assertEquals(1, targetNestedStruct.get("first"));
+        assertEquals("abc", targetNestedStruct.get("second"));
+        assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array"));
+        assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map"));
+
+        Struct projectedStruct = (Struct) targetNestedStruct.get("nested");
+        assertEquals(113, projectedStruct.get("field"));
+        assertEquals(123, projectedStruct.get("field2"));
+    }
+
+    @Test
+    public void testLogicalTypeProjection() throws Exception {
+        Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA};
+        Object projected;
+
+        BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2);
+        projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2));
+        assertEquals(testDecimal, projected);
+
+        projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA);
+        assertEquals(1000, projected);
+
+        projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA);
+        assertEquals(231, projected);
+
+        projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
+        assertEquals(34567L, projected);
+
+        Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
+        for (Schema logicalTypeSchema: logicalTypeSchemas) {
+            try {
+                SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA);
+                fail("Cannot project logical types to non-logical types.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+
+            try {
+                SchemaProjector.project(logicalTypeSchema, null, namedSchema);
+                fail("Reader name is not a valid logical type name.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+
+            try {
+                SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema);
+                fail("Cannot project non-logical types to logical types.");
+            } catch (SchemaProjectorException e) {
+                // expected
+            }
+        }
+    }
+
+    @Test
+    public void testArrayProjection() throws Exception {
+        Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+
+        Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source);
+        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+        Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build();
+        Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 3)).build();
+        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target);
+        assertEquals(Arrays.asList(4, 5), (List<Integer>) projected);
+        projected = SchemaProjector.project(optionalSource, null, target);
+        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+        Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 3L)).build();
+        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget);
+        List<Long> expectedProjected = Arrays.asList(4L, 5L);
+        assertEquals(expectedProjected, (List<Long>) projected);
+        projected = SchemaProjector.project(optionalSource, null, promotedTarget);
+        assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected);
+
+        Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+        try {
+            SchemaProjector.project(optionalSource, null, noDefaultValueTarget);
+            fail("Target schema does not provide a default value.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build();
+        try {
+            SchemaProjector.project(optionalSource, null, nonPromotableTarget);
+            fail("Neither source type matches target type nor source type can be promoted to target type");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMapProjection() throws Exception {
+        Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build();
+
+        Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build();
+        Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), target);
+        assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected);
+        projected = SchemaProjector.project(source, null, target);
+        assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected);
+
+        Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue(
+                Collections.singletonMap(3L, 4.5F)).build();
+        projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget);
+        assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected);
+        projected = SchemaProjector.project(source, null, promotedTarget);
+        assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected);
+
+        Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
+        try {
+            SchemaProjector.project(source, null, noDefaultValueTarget);
+            fail("Reader does not provide a default value.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build();
+        try {
+            SchemaProjector.project(source, null, nonPromotableTarget);
+            fail("Neither source type matches target type nor source type can be promoted to target type");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMaybeCompatible() throws Exception {
+        Schema source = SchemaBuilder.int32().name("source").build();
+        Schema target = SchemaBuilder.int32().name("target").build();
+
+        try {
+            SchemaProjector.project(source, 12, target);
+            fail("Source name and target name mismatch.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+
+        Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value"));
+        try {
+            SchemaProjector.project(source, 34, targetWithParameters);
+            fail("Source parameters and target parameters mismatch.");
+        } catch (SchemaProjectorException e) {
+            // expected
+        }
+    }
+
+    private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) {
+        Schema target;
+        assert source.isOptional();
+        assert value != null;
+        if (optional) {
+            target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build();
+        } else {
+            target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build();
+        }
+        Object projected = SchemaProjector.project(source, value, target);
+        if (targetType == Type.FLOAT64) {
+            assertEquals((double) expectedProjected, (double) projected, 1e-6);
+        } else {
+            assertEquals(expectedProjected, projected);
+        }
+
+        projected = SchemaProjector.project(source, null, target);
+        if (optional) {
+            assertEquals(null, projected);
+        } else {
+            assertEquals(defaultValue, projected);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
new file mode 100644
index 0000000..c73992b
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class StructTest {
+
+    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("int8", Schema.INT8_SCHEMA)
+            .field("int16", Schema.INT16_SCHEMA)
+            .field("int32", Schema.INT32_SCHEMA)
+            .field("int64", Schema.INT64_SCHEMA)
+            .field("float32", Schema.FLOAT32_SCHEMA)
+            .field("float64", Schema.FLOAT64_SCHEMA)
+            .field("boolean", Schema.BOOLEAN_SCHEMA)
+            .field("string", Schema.STRING_SCHEMA)
+            .field("bytes", Schema.BYTES_SCHEMA)
+            .build();
+
+    private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+    private static final Schema MAP_SCHEMA = SchemaBuilder.map(
+            Schema.INT32_SCHEMA,
+            Schema.STRING_SCHEMA
+    ).build();
+    private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct()
+            .field("int8", Schema.INT8_SCHEMA)
+            .build();
+    private static final Schema NESTED_SCHEMA = SchemaBuilder.struct()
+            .field("array", ARRAY_SCHEMA)
+            .field("map", MAP_SCHEMA)
+            .field("nested", NESTED_CHILD_SCHEMA)
+            .build();
+
+    private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA;
+    private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build();
+    private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build();
+
+    @Test
+    public void testFlatStruct() {
+        Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", "foobar".getBytes());
+
+        // Test equality, and also the type-specific getters
+        assertEquals((byte) 12, (byte) struct.getInt8("int8"));
+        assertEquals((short) 12, (short) struct.getInt16("int16"));
+        assertEquals(12, (int) struct.getInt32("int32"));
+        assertEquals((long) 12, (long) struct.getInt64("int64"));
+        assertEquals((Float) 12.f, struct.getFloat32("float32"));
+        assertEquals((Double) 12., struct.getFloat64("float64"));
+        assertEquals(true, struct.getBoolean("boolean"));
+        assertEquals("foobar", struct.getString("string"));
+        assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes")));
+
+        struct.validate();
+    }
+
+    @Test
+    public void testComplexStruct() {
+        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map = Collections.singletonMap(1, "string");
+        Struct struct = new Struct(NESTED_SCHEMA)
+                .put("array", array)
+                .put("map", map)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+
+        // Separate the call to get the array and map to validate the typed get methods work properly
+        List<Byte> arrayExtracted = struct.getArray("array");
+        assertEquals(array, arrayExtracted);
+        Map<Byte, Byte> mapExtracted = struct.getMap("map");
+        assertEquals(map, mapExtracted);
+        assertEquals((byte) 12, struct.getStruct("nested").get("int8"));
+
+        struct.validate();
+    }
+
+
+    // These don't test all the ways validation can fail, just one for each element. See more extensive validation
+    // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper
+    // inspection than just checking the class of the object
+
+    @Test(expected = DataException.class)
+    public void testInvalidFieldType() {
+        new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8");
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidArrayFieldElements() {
+        new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidMapKeyElements() {
+        new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidStructFieldSchema() {
+        new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA));
+    }
+
+    @Test(expected = DataException.class)
+    public void testInvalidStructFieldValue() {
+        new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA));
+    }
+
+
+    @Test(expected = DataException.class)
+    public void testMissingFieldValidation() {
+        // Required int8 field
+        Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+    @Test
+    public void testMissingOptionalFieldValidation() {
+        Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+    @Test
+    public void testMissingFieldWithDefaultValidation() {
+        Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build();
+        Struct struct = new Struct(schema);
+        struct.validate();
+    }
+
+
+    @Test
+    public void testEquals() {
+        Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+        Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "foobar")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+        Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
+                .put("int8", (byte) 12)
+                .put("int16", (short) 12)
+                .put("int32", 12)
+                .put("int64", (long) 12)
+                .put("float32", 12.f)
+                .put("float64", 12.)
+                .put("boolean", true)
+                .put("string", "mismatching string")
+                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+
+        assertEquals(struct1, struct2);
+        assertNotEquals(struct1, struct3);
+
+        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map = Collections.singletonMap(1, "string");
+        struct1 = new Struct(NESTED_SCHEMA)
+                .put("array", array)
+                .put("map", map)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+        List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2);
+        Map<Integer, String> map2 = Collections.singletonMap(1, "string");
+        struct2 = new Struct(NESTED_SCHEMA)
+                .put("array", array2)
+                .put("map", map2)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+        List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3);
+        Map<Integer, String> map3 = Collections.singletonMap(2, "string");
+        struct3 = new Struct(NESTED_SCHEMA)
+                .put("array", array3)
+                .put("map", map3)
+                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13));
+
+        assertEquals(struct1, struct2);
+        assertNotEquals(struct1, struct3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
new file mode 100644
index 0000000..45bdc4e
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT;
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
+
+
+        EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Time.SCHEMA;
+        assertEquals(Time.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime()));
+        assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime());
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidHasDateComponents() {
+        Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0));
+        assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Time.toLogical(Time.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
new file mode 100644
index 0000000..6121160
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_MILLIS;
+
+    private static final int NUM_MILLIS = 2000000000;
+    private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2;
+
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+
+        EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Date.SCHEMA;
+        assertEquals(Date.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH.getTime()));
+        assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH_PLUS_MILLIS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), EPOCH.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 0L));
+        assertEquals(EPOCH_PLUS_MILLIS.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Date.toLogical(Date.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
new file mode 100644
index 0000000..017b2d3
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+
+public class StringConverterTest {
+    private static final String TOPIC = "topic";
+    private static final String SAMPLE_STRING = "a string";
+
+    private StringConverter converter = new StringConverter();
+
+    @Test
+    public void testStringToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+    }
+
+    @Test
+    public void testNonStringToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+    }
+
+    @Test
+    public void testNullToBytes() {
+        assertEquals(null, converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null));
+    }
+
+    @Test
+    public void testToBytesIgnoresSchema() throws UnsupportedEncodingException {
+        assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, null, true));
+    }
+
+    @Test
+    public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException {
+        converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+        assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+    }
+
+    @Test
+    public void testBytesToString() {
+        SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes());
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(SAMPLE_STRING, data.value());
+    }
+
+    @Test
+    public void testBytesNullToString() {
+        SchemaAndValue data = converter.toConnectData(TOPIC, null);
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(null, data.value());
+    }
+
+    @Test
+    public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
+        converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+        SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
+        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+        assertEquals(SAMPLE_STRING, data.value());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
new file mode 100644
index 0000000..9f8338d
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorUtilsTest {
+
+    private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5);
+
+    @Test
+    public void testGroupPartitions() {
+
+        List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
+        assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
+        assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
+        assertEquals(Arrays.asList(Arrays.asList(1, 2),
+                Arrays.asList(3, 4),
+                Arrays.asList(5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
+        assertEquals(Arrays.asList(Arrays.asList(1),
+                Arrays.asList(2),
+                Arrays.asList(3),
+                Arrays.asList(4),
+                Arrays.asList(5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
+        assertEquals(Arrays.asList(Arrays.asList(1),
+                Arrays.asList(2),
+                Arrays.asList(3),
+                Arrays.asList(4),
+                Arrays.asList(5),
+                Collections.EMPTY_LIST,
+                Collections.EMPTY_LIST), grouped);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGroupPartitionsInvalidCount() {
+        ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
new file mode 100644
index 0000000..a73153f
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSinkConnector extends SinkConnector {
+    public static final String FILE_CONFIG = "file";
+
+    private String filename;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        filename = props.get(FILE_CONFIG);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FileStreamSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> config = new HashMap<>();
+            if (filename != null)
+                config.put(FILE_CONFIG, filename);
+            configs.add(config);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since FileStreamSinkConnector has no background monitoring.
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
new file mode 100644
index 0000000..83ba6d4
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * FileStreamSinkTask writes records to stdout or a file.
+ */
+public class FileStreamSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
+
+    private String filename;
+    private PrintStream outputStream;
+
+    public FileStreamSinkTask() {
+    }
+
+    // for testing
+    public FileStreamSinkTask(PrintStream outputStream) {
+        filename = null;
+        this.outputStream = outputStream;
+    }
+
+    @Override
+    public String version() {
+        return new FileStreamSinkConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
+        if (filename == null) {
+            outputStream = System.out;
+        } else {
+            try {
+                outputStream = new PrintStream(new FileOutputStream(filename, true));
+            } catch (FileNotFoundException e) {
+                throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
+            }
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> sinkRecords) {
+        for (SinkRecord record : sinkRecords) {
+            log.trace("Writing line to {}: {}", logFilename(), record.value());
+            outputStream.println(record.value());
+        }
+    }
+
+    @Override
+    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        log.trace("Flushing output stream for {}", logFilename());
+        outputStream.flush();
+    }
+
+    @Override
+    public void stop() {
+        if (outputStream != System.out)
+            outputStream.close();
+    }
+
+    private String logFilename() {
+        return filename == null ? "stdout" : filename;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
new file mode 100644
index 0000000..843e999
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSourceConnector extends SourceConnector {
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String FILE_CONFIG = "file";
+
+    private String filename;
+    private String topic;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        filename = props.get(FILE_CONFIG);
+        topic = props.get(TOPIC_CONFIG);
+        if (topic == null || topic.isEmpty())
+            throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
+        if (topic.contains(","))
+            throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return FileStreamSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        // Only one input stream makes sense.
+        Map<String, String> config = new HashMap<>();
+        if (filename != null)
+            config.put(FILE_CONFIG, filename);
+        config.put(TOPIC_CONFIG, topic);
+        configs.add(config);
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do since FileStreamSourceConnector has no background monitoring.
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
new file mode 100644
index 0000000..f30b603
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * FileStreamSourceTask reads from stdin or a file.
+ */
+public class FileStreamSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
+    public static final String FILENAME_FIELD = "filename";
+    public  static final String POSITION_FIELD = "position";
+    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+
+    private String filename;
+    private InputStream stream;
+    private BufferedReader reader = null;
+    private char[] buffer = new char[1024];
+    private int offset = 0;
+    private String topic = null;
+
+    private Long streamOffset;
+
+    @Override
+    public String version() {
+        return new FileStreamSourceConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        if (filename == null || filename.isEmpty()) {
+            stream = System.in;
+            // Tracking offset for stdin doesn't make sense
+            streamOffset = null;
+            reader = new BufferedReader(new InputStreamReader(stream));
+        }
+        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+        if (topic == null)
+            throw new ConnectException("FileStreamSourceTask config missing topic setting");
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        if (stream == null) {
+            try {
+                stream = new FileInputStream(filename);
+                Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+                if (offset != null) {
+                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
+                    if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
+                        throw new ConnectException("Offset position is the incorrect type");
+                    if (lastRecordedOffset != null) {
+                        log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
+                        long skipLeft = (Long) lastRecordedOffset;
+                        while (skipLeft > 0) {
+                            try {
+                                long skipped = stream.skip(skipLeft);
+                                skipLeft -= skipped;
+                            } catch (IOException e) {
+                                log.error("Error while trying to seek to previous offset in file: ", e);
+                                throw new ConnectException(e);
+                            }
+                        }
+                        log.debug("Skipped to offset {}", lastRecordedOffset);
+                    }
+                    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
+                } else {
+                    streamOffset = 0L;
+                }
+                reader = new BufferedReader(new InputStreamReader(stream));
+                log.debug("Opened {} for reading", logFilename());
+            } catch (FileNotFoundException e) {
+                log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
+                synchronized (this) {
+                    this.wait(1000);
+                }
+                return null;
+            }
+        }
+
+        // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
+        // Instead we have to manage splitting lines ourselves, using simple backoff when no new data
+        // is available.
+        try {
+            final BufferedReader readerCopy;
+            synchronized (this) {
+                readerCopy = reader;
+            }
+            if (readerCopy == null)
+                return null;
+
+            ArrayList<SourceRecord> records = null;
+
+            int nread = 0;
+            while (readerCopy.ready()) {
+                nread = readerCopy.read(buffer, offset, buffer.length - offset);
+                log.trace("Read {} bytes from {}", nread, logFilename());
+
+                if (nread > 0) {
+                    offset += nread;
+                    if (offset == buffer.length) {
+                        char[] newbuf = new char[buffer.length * 2];
+                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+                        buffer = newbuf;
+                    }
+
+                    String line;
+                    do {
+                        line = extractLine();
+                        if (line != null) {
+                            log.trace("Read a line from {}", logFilename());
+                            if (records == null)
+                                records = new ArrayList<>();
+                            records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
+                        }
+                        new ArrayList<SourceRecord>();
+                    } while (line != null);
+                }
+            }
+
+            if (nread <= 0)
+                synchronized (this) {
+                    this.wait(1000);
+                }
+
+            return records;
+        } catch (IOException e) {
+            // Underlying stream was killed, probably as a result of calling stop. Allow to return
+            // null, and driving thread will handle any shutdown if necessary.
+        }
+        return null;
+    }
+
+    private String extractLine() {
+        int until = -1, newStart = -1;
+        for (int i = 0; i < offset; i++) {
+            if (buffer[i] == '\n') {
+                until = i;
+                newStart = i + 1;
+                break;
+            } else if (buffer[i] == '\r') {
+                // We need to check for \r\n, so we must skip this if we can't check the next char
+                if (i + 1 >= offset)
+                    return null;
+
+                until = i;
+                newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
+                break;
+            }
+        }
+
+        if (until != -1) {
+            String result = new String(buffer, 0, until);
+            System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
+            offset = offset - newStart;
+            if (streamOffset != null)
+                streamOffset += newStart;
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void stop() {
+        log.trace("Stopping");
+        synchronized (this) {
+            try {
+                if (stream != null && stream != System.in) {
+                    stream.close();
+                    log.trace("Closed input stream");
+                }
+            } catch (IOException e) {
+                log.error("Failed to close FileStreamSourceTask stream: ", e);
+            }
+            this.notify();
+        }
+    }
+
+    private Map<String, String> offsetKey(String filename) {
+        return Collections.singletonMap(FILENAME_FIELD, filename);
+    }
+
+    private Map<String, Long> offsetValue(Long pos) {
+        return Collections.singletonMap(POSITION_FIELD, pos);
+    }
+
+    private String logFilename() {
+        return filename == null ? "stdin" : filename;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
new file mode 100644
index 0000000..5ed03f4
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkConnectorTest {
+
+    private static final String MULTIPLE_TOPICS = "test1,test2";
+    private static final String[] MULTIPLE_TOPICS_LIST
+            = MULTIPLE_TOPICS.split(",");
+    private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList(
+            new TopicPartition("test1", 1), new TopicPartition("test2", 2)
+    );
+    private static final String FILENAME = "/afilename";
+
+    private FileStreamSinkConnector connector;
+    private ConnectorContext ctx;
+    private Map<String, String> sinkProperties;
+
+    @Before
+    public void setup() {
+        connector = new FileStreamSinkConnector();
+        ctx = PowerMock.createMock(ConnectorContext.class);
+        connector.initialize(ctx);
+
+        sinkProperties = new HashMap<>();
+        sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
+        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
+    }
+
+    @Test
+    public void testSinkTasks() {
+        PowerMock.replayAll();
+
+        connector.start(sinkProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
+
+        taskConfigs = connector.taskConfigs(2);
+        assertEquals(2, taskConfigs.size());
+        for (int i = 0; i < 2; i++) {
+            assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskClass() {
+        PowerMock.replayAll();
+
+        connector.start(sinkProperties);
+        assertEquals(FileStreamSinkTask.class, connector.taskClass());
+
+        PowerMock.verifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
new file mode 100644
index 0000000..754e7f5
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkTaskTest {
+
+    private FileStreamSinkTask task;
+    private ByteArrayOutputStream os;
+    private PrintStream printStream;
+
+    @Before
+    public void setup() {
+        os = new ByteArrayOutputStream();
+        printStream = new PrintStream(os);
+        task = new FileStreamSinkTask(printStream);
+    }
+
+    @Test
+    public void testPutFlush() {
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+
+        // We do not call task.start() since it would override the output stream
+
+        task.put(Arrays.asList(
+                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
+        ));
+        offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
+        task.flush(offsets);
+        assertEquals("line1\n", os.toString());
+
+        task.put(Arrays.asList(
+                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
+                new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
+        ));
+        offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
+        offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
+        task.flush(offsets);
+        assertEquals("line1\nline2\nline3\n", os.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
new file mode 100644
index 0000000..80ff7f5
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class FileStreamSourceConnectorTest {
+
+    private static final String SINGLE_TOPIC = "test";
+    private static final String MULTIPLE_TOPICS = "test1,test2";
+    private static final String FILENAME = "/somefilename";
+
+    private FileStreamSourceConnector connector;
+    private ConnectorContext ctx;
+    private Map<String, String> sourceProperties;
+
+    @Before
+    public void setup() {
+        connector = new FileStreamSourceConnector();
+        ctx = PowerMock.createMock(ConnectorContext.class);
+        connector.initialize(ctx);
+
+        sourceProperties = new HashMap<>();
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
+        sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
+    }
+
+    @Test
+    public void testSourceTasks() {
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME,
+                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+        assertEquals(SINGLE_TOPIC,
+                taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
+
+        // Should be able to return fewer than requested #
+        taskConfigs = connector.taskConfigs(2);
+        assertEquals(1, taskConfigs.size());
+        assertEquals(FILENAME,
+                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+        assertEquals(SINGLE_TOPIC,
+                taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSourceTasksStdin() {
+        PowerMock.replayAll();
+
+        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+        connector.start(sourceProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+        assertEquals(1, taskConfigs.size());
+        assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testMultipleSourcesInvalid() {
+        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
+        connector.start(sourceProperties);
+    }
+
+    @Test
+    public void testTaskClass() {
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        assertEquals(FileStreamSourceTask.class, connector.taskClass());
+
+        PowerMock.verifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
new file mode 100644
index 0000000..3689313
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSourceTaskTest {
+
+    private static final String TOPIC = "test";
+
+    private File tempFile;
+    private Map<String, String> config;
+    private OffsetStorageReader offsetStorageReader;
+    private SourceTaskContext context;
+    private FileStreamSourceTask task;
+
+    private boolean verifyMocks = false;
+
+    @Before
+    public void setup() throws IOException {
+        tempFile = File.createTempFile("file-stream-source-task-test", null);
+        config = new HashMap<>();
+        config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+        task = new FileStreamSourceTask();
+        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
+        context = PowerMock.createMock(SourceTaskContext.class);
+        task.initialize(context);
+    }
+
+    @After
+    public void teardown() {
+        tempFile.delete();
+
+        if (verifyMocks)
+            PowerMock.verifyAll();
+    }
+
+    private void replay() {
+        PowerMock.replayAll();
+        verifyMocks = true;
+    }
+
+    @Test
+    public void testNormalLifecycle() throws InterruptedException, IOException {
+        expectOffsetLookupReturnNone();
+        replay();
+
+        task.start(config);
+
+        FileOutputStream os = new FileOutputStream(tempFile);
+        assertEquals(null, task.poll());
+        os.write("partial line".getBytes());
+        os.flush();
+        assertEquals(null, task.poll());
+        os.write(" finished\n".getBytes());
+        os.flush();
+        List<SourceRecord> records = task.poll();
+        assertEquals(1, records.size());
+        assertEquals(TOPIC, records.get(0).topic());
+        assertEquals("partial line finished", records.get(0).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
+        assertEquals(null, task.poll());
+
+        // Different line endings, and make sure the final \r doesn't result in a line until we can
+        // read the subsequent byte.
+        os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
+        os.flush();
+        records = task.poll();
+        assertEquals(4, records.size());
+        assertEquals("line1", records.get(0).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
+        assertEquals("line2", records.get(1).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
+        assertEquals("line3", records.get(2).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
+        assertEquals("line4", records.get(3).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
+
+        os.write("subsequent text".getBytes());
+        os.flush();
+        records = task.poll();
+        assertEquals(1, records.size());
+        assertEquals("", records.get(0).value());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+        assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
+
+        task.stop();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testMissingTopic() throws InterruptedException {
+        replay();
+
+        config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
+        task.start(config);
+    }
+
+    public void testInvalidFile() throws InterruptedException {
+        config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
+        task.start(config);
+        // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
+        for (int i = 0; i < 100; i++)
+            assertEquals(null, task.poll());
+    }
+
+
+    private void expectOffsetLookupReturnNone() {
+        EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
+    }
+}
\ No newline at end of file