You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:42 UTC
[23/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
new file mode 100644
index 0000000..0b1760b
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.errors.SchemaProjectorException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SchemaProjectorTest {
+
+ @Test
+ public void testPrimitiveTypeProjection() throws Exception {
+ Object projected;
+ projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+ assertEquals(false, projected);
+
+ byte[] bytes = {(byte) 1, (byte) 2};
+ projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+ assertEquals(bytes, projected);
+
+ projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+ assertEquals("abc", projected);
+
+ projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA);
+ assertEquals(false, projected);
+
+ projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA);
+ assertEquals(bytes, projected);
+
+ projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA);
+ assertEquals("abc", projected);
+
+ try {
+ SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
+ fail("Cannot project optional schema to schema with no default value.");
+ } catch (DataException e) {
+ // expected
+ }
+
+ try {
+ SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
+ fail("Cannot project optional schema to schema with no default value.");
+ } catch (DataException e) {
+ // expected
+ }
+
+ try {
+ SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
+ fail("Cannot project optional schema to schema with no default value.");
+ } catch (DataException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testNumericTypeProjection() throws Exception {
+ Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA};
+ Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA,
+ Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA};
+
+ Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345};
+ Map<Object, List<?>> expectedProjected = new HashMap<>();
+ expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L, 127.F, 127.));
+ expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.));
+ expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
+ expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
+ expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
+ expectedProjected.put(values[5], Arrays.asList(1.2345));
+
+ Object promoted;
+ for (int i = 0; i < promotableSchemas.length; ++i) {
+ Schema source = promotableSchemas[i];
+ List<?> expected = expectedProjected.get(values[i]);
+ for (int j = i; j < promotableSchemas.length; ++j) {
+ Schema target = promotableSchemas[j];
+ promoted = SchemaProjector.project(source, values[i], target);
+ if (target.type() == Type.FLOAT64) {
+ assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+ } else {
+ assertEquals(expected.get(j - i), promoted);
+ }
+ }
+ for (int j = i; j < promotableOptionalSchemas.length; ++j) {
+ Schema target = promotableOptionalSchemas[j];
+ promoted = SchemaProjector.project(source, values[i], target);
+ if (target.type() == Type.FLOAT64) {
+ assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+ } else {
+ assertEquals(expected.get(j - i), promoted);
+ }
+ }
+ }
+
+ for (int i = 0; i < promotableOptionalSchemas.length; ++i) {
+ Schema source = promotableSchemas[i];
+ List<?> expected = expectedProjected.get(values[i]);
+ for (int j = i; j < promotableOptionalSchemas.length; ++j) {
+ Schema target = promotableOptionalSchemas[j];
+ promoted = SchemaProjector.project(source, values[i], target);
+ if (target.type() == Type.FLOAT64) {
+ assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
+ } else {
+ assertEquals(expected.get(j - i), promoted);
+ }
+ }
+ }
+
+ Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA};
+ for (Schema promotableSchema: promotableSchemas) {
+ for (Schema nonPromotableSchema: nonPromotableSchemas) {
+ Object dummy = new Object();
+ try {
+ SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema);
+ fail("Cannot promote " + promotableSchema.type() + " to " + nonPromotableSchema.type());
+ } catch (DataException e) {
+ // expected
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testPrimitiveOptionalProjection() throws Exception {
+ verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, true);
+ verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, false);
+
+ byte[] bytes = {(byte) 1, (byte) 2};
+ byte[] defaultBytes = {(byte) 3, (byte) 4};
+ verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, true);
+ verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", true);
+ verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, false);
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., true);
+ verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, true);
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, false);
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, true);
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, false);
+
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, true);
+ verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, false);
+ }
+
+ @Test
+ public void testStructAddField() throws Exception {
+ Schema source = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .build();
+ Struct sourceStruct = new Struct(source);
+ sourceStruct.put("field", 1);
+
+ Schema target = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+ .build();
+
+ Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+
+ assertEquals(1, (int) targetStruct.getInt32("field"));
+ assertEquals(123, (int) targetStruct.getInt32("field2"));
+
+ Schema incompatibleTargetSchema = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", Schema.INT32_SCHEMA)
+ .build();
+
+ try {
+ SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema);
+ fail("Incompatible schema.");
+ } catch (DataException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStructRemoveField() throws Exception {
+ Schema source = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", Schema.INT32_SCHEMA)
+ .build();
+ Struct sourceStruct = new Struct(source);
+ sourceStruct.put("field", 1);
+ sourceStruct.put("field2", 234);
+
+ Schema target = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .build();
+ Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+ assertEquals(1, targetStruct.get("field"));
+ try {
+ targetStruct.get("field2");
+ fail("field2 is not part of the projected struct");
+ } catch (DataException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testStructDefaultValue() throws Exception {
+ Schema source = SchemaBuilder.struct().optional()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", Schema.INT32_SCHEMA)
+ .build();
+
+ SchemaBuilder builder = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", Schema.INT32_SCHEMA);
+
+ Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345);
+ builder.defaultValue(defaultStruct);
+ Schema target = builder.build();
+
+ Object projected = SchemaProjector.project(source, null, target);
+ assertEquals(defaultStruct, projected);
+
+ Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678);
+ Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
+
+ assertEquals(sourceStruct.get("field"), targetStruct.get("field"));
+ assertEquals(sourceStruct.get("field2"), targetStruct.get("field2"));
+ }
+
+ @Test
+ public void testNestedSchemaProjection() throws Exception {
+ Schema sourceFlatSchema = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .build();
+ Schema targetFlatSchema = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .field("field2", SchemaBuilder.int32().defaultValue(123).build())
+ .build();
+ Schema sourceNestedSchema = SchemaBuilder.struct()
+ .field("first", Schema.INT32_SCHEMA)
+ .field("second", Schema.STRING_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+ .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+ .field("nested", sourceFlatSchema)
+ .build();
+ Schema targetNestedSchema = SchemaBuilder.struct()
+ .field("first", Schema.INT32_SCHEMA)
+ .field("second", Schema.STRING_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+ .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+ .field("nested", targetFlatSchema)
+ .build();
+
+ Struct sourceFlatStruct = new Struct(sourceFlatSchema);
+ sourceFlatStruct.put("field", 113);
+
+ Struct sourceNestedStruct = new Struct(sourceNestedSchema);
+ sourceNestedStruct.put("first", 1);
+ sourceNestedStruct.put("second", "abc");
+ sourceNestedStruct.put("array", Arrays.asList(1, 2));
+ sourceNestedStruct.put("map", Collections.singletonMap(5, "def"));
+ sourceNestedStruct.put("nested", sourceFlatStruct);
+
+ Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema, sourceNestedStruct,
+ targetNestedSchema);
+ assertEquals(1, targetNestedStruct.get("first"));
+ assertEquals("abc", targetNestedStruct.get("second"));
+ assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array"));
+ assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map"));
+
+ Struct projectedStruct = (Struct) targetNestedStruct.get("nested");
+ assertEquals(113, projectedStruct.get("field"));
+ assertEquals(123, projectedStruct.get("field2"));
+ }
+
+ @Test
+ public void testLogicalTypeProjection() throws Exception {
+ Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA};
+ Object projected;
+
+ BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2);
+ projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2));
+ assertEquals(testDecimal, projected);
+
+ projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA);
+ assertEquals(1000, projected);
+
+ projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA);
+ assertEquals(231, projected);
+
+ projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
+ assertEquals(34567L, projected);
+
+ Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
+ for (Schema logicalTypeSchema: logicalTypeSchemas) {
+ try {
+ SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA);
+ fail("Cannot project logical types to non-logical types.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+
+ try {
+ SchemaProjector.project(logicalTypeSchema, null, namedSchema);
+ fail("Reader name is not a valid logical type name.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+
+ try {
+ SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema);
+ fail("Cannot project non-logical types to logical types.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+ }
+ }
+
+ @Test
+ public void testArrayProjection() throws Exception {
+ Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+
+ Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source);
+ assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+ Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build();
+ Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 3)).build();
+ projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target);
+ assertEquals(Arrays.asList(4, 5), (List<Integer>) projected);
+ projected = SchemaProjector.project(optionalSource, null, target);
+ assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
+
+ Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 3L)).build();
+ projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget);
+ List<Long> expectedProjected = Arrays.asList(4L, 5L);
+ assertEquals(expectedProjected, (List<Long>) projected);
+ projected = SchemaProjector.project(optionalSource, null, promotedTarget);
+ assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected);
+
+ Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
+ try {
+ SchemaProjector.project(optionalSource, null, noDefaultValueTarget);
+ fail("Target schema does not provide a default value.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+
+ Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build();
+ try {
+ SchemaProjector.project(optionalSource, null, nonPromotableTarget);
+ fail("Neither source type matches target type nor source type can be promoted to target type");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMapProjection() throws Exception {
+ Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build();
+
+ Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build();
+ Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), target);
+ assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected);
+ projected = SchemaProjector.project(source, null, target);
+ assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected);
+
+ Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue(
+ Collections.singletonMap(3L, 4.5F)).build();
+ projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget);
+ assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected);
+ projected = SchemaProjector.project(source, null, promotedTarget);
+ assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected);
+
+ Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
+ try {
+ SchemaProjector.project(source, null, noDefaultValueTarget);
+ fail("Reader does not provide a default value.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+
+ Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build();
+ try {
+ SchemaProjector.project(source, null, nonPromotableTarget);
+ fail("Neither source type matches target type nor source type can be promoted to target type");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMaybeCompatible() throws Exception {
+ Schema source = SchemaBuilder.int32().name("source").build();
+ Schema target = SchemaBuilder.int32().name("target").build();
+
+ try {
+ SchemaProjector.project(source, 12, target);
+ fail("Source name and target name mismatch.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+
+ Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value"));
+ try {
+ SchemaProjector.project(source, 34, targetWithParameters);
+ fail("Source parameters and target parameters mismatch.");
+ } catch (SchemaProjectorException e) {
+ // expected
+ }
+ }
+
+ private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) {
+ Schema target;
+ assert source.isOptional();
+ assert value != null;
+ if (optional) {
+ target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build();
+ } else {
+ target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build();
+ }
+ Object projected = SchemaProjector.project(source, value, target);
+ if (targetType == Type.FLOAT64) {
+ assertEquals((double) expectedProjected, (double) projected, 1e-6);
+ } else {
+ assertEquals(expectedProjected, projected);
+ }
+
+ projected = SchemaProjector.project(source, null, target);
+ if (optional) {
+ assertEquals(null, projected);
+ } else {
+ assertEquals(defaultValue, projected);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
new file mode 100644
index 0000000..c73992b
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class StructTest {
+
+ private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("int8", Schema.INT8_SCHEMA)
+ .field("int16", Schema.INT16_SCHEMA)
+ .field("int32", Schema.INT32_SCHEMA)
+ .field("int64", Schema.INT64_SCHEMA)
+ .field("float32", Schema.FLOAT32_SCHEMA)
+ .field("float64", Schema.FLOAT64_SCHEMA)
+ .field("boolean", Schema.BOOLEAN_SCHEMA)
+ .field("string", Schema.STRING_SCHEMA)
+ .field("bytes", Schema.BYTES_SCHEMA)
+ .build();
+
+ private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+ private static final Schema MAP_SCHEMA = SchemaBuilder.map(
+ Schema.INT32_SCHEMA,
+ Schema.STRING_SCHEMA
+ ).build();
+ private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct()
+ .field("int8", Schema.INT8_SCHEMA)
+ .build();
+ private static final Schema NESTED_SCHEMA = SchemaBuilder.struct()
+ .field("array", ARRAY_SCHEMA)
+ .field("map", MAP_SCHEMA)
+ .field("nested", NESTED_CHILD_SCHEMA)
+ .build();
+
+ private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA;
+ private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build();
+ private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build();
+
+ @Test
+ public void testFlatStruct() {
+ Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", "foobar".getBytes());
+
+ // Test equality, and also the type-specific getters
+ assertEquals((byte) 12, (byte) struct.getInt8("int8"));
+ assertEquals((short) 12, (short) struct.getInt16("int16"));
+ assertEquals(12, (int) struct.getInt32("int32"));
+ assertEquals((long) 12, (long) struct.getInt64("int64"));
+ assertEquals((Float) 12.f, struct.getFloat32("float32"));
+ assertEquals((Double) 12., struct.getFloat64("float64"));
+ assertEquals(true, struct.getBoolean("boolean"));
+ assertEquals("foobar", struct.getString("string"));
+ assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes")));
+
+ struct.validate();
+ }
+
+ @Test
+ public void testComplexStruct() {
+ List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map = Collections.singletonMap(1, "string");
+ Struct struct = new Struct(NESTED_SCHEMA)
+ .put("array", array)
+ .put("map", map)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+
+ // Separate the call to get the array and map to validate the typed get methods work properly
+ List<Byte> arrayExtracted = struct.getArray("array");
+ assertEquals(array, arrayExtracted);
+ Map<Byte, Byte> mapExtracted = struct.getMap("map");
+ assertEquals(map, mapExtracted);
+ assertEquals((byte) 12, struct.getStruct("nested").get("int8"));
+
+ struct.validate();
+ }
+
+
+ // These don't test all the ways validation can fail, just one for each element. See more extensive validation
+ // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper
+ // inspection than just checking the class of the object
+
+ @Test(expected = DataException.class)
+ public void testInvalidFieldType() {
+ new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8");
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidArrayFieldElements() {
+ new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidMapKeyElements() {
+ new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidStructFieldSchema() {
+ new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA));
+ }
+
+ @Test(expected = DataException.class)
+ public void testInvalidStructFieldValue() {
+ new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA));
+ }
+
+
+ @Test(expected = DataException.class)
+ public void testMissingFieldValidation() {
+ // Required int8 field
+ Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+ @Test
+ public void testMissingOptionalFieldValidation() {
+ Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+ @Test
+ public void testMissingFieldWithDefaultValidation() {
+ Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build();
+ Struct struct = new Struct(schema);
+ struct.validate();
+ }
+
+
+ @Test
+ public void testEquals() {
+ Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+ Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "foobar")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+ Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
+ .put("int8", (byte) 12)
+ .put("int16", (short) 12)
+ .put("int32", 12)
+ .put("int64", (long) 12)
+ .put("float32", 12.f)
+ .put("float64", 12.)
+ .put("boolean", true)
+ .put("string", "mismatching string")
+ .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
+
+ assertEquals(struct1, struct2);
+ assertNotEquals(struct1, struct3);
+
+ List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map = Collections.singletonMap(1, "string");
+ struct1 = new Struct(NESTED_SCHEMA)
+ .put("array", array)
+ .put("map", map)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+ List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2);
+ Map<Integer, String> map2 = Collections.singletonMap(1, "string");
+ struct2 = new Struct(NESTED_SCHEMA)
+ .put("array", array2)
+ .put("map", map2)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12));
+ List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3);
+ Map<Integer, String> map3 = Collections.singletonMap(2, "string");
+ struct3 = new Struct(NESTED_SCHEMA)
+ .put("array", array3)
+ .put("map", map3)
+ .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13));
+
+ assertEquals(struct1, struct2);
+ assertNotEquals(struct1, struct3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
new file mode 100644
index 0000000..45bdc4e
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeTest {
+ private static final GregorianCalendar EPOCH;
+ private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT;
+ private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
+ static {
+ EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+ EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
+
+
+ EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+ EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000);
+ }
+
+ @Test
+ public void testBuilder() {
+ Schema plain = Time.SCHEMA;
+ assertEquals(Time.LOGICAL_NAME, plain.name());
+ assertEquals(1, (Object) plain.version());
+ }
+
+ @Test
+ public void testFromLogical() {
+ assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime()));
+ assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime()));
+ }
+
+ @Test(expected = DataException.class)
+ public void testFromLogicalInvalidSchema() {
+ Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime());
+ }
+
+ @Test(expected = DataException.class)
+ public void testFromLogicalInvalidHasDateComponents() {
+ Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime());
+ }
+
+ @Test
+ public void testToLogical() {
+ assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0));
+ assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000));
+ }
+
+ @Test(expected = DataException.class)
+ public void testToLogicalInvalidSchema() {
+ Time.toLogical(Time.builder().name("invalid").build(), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
new file mode 100644
index 0000000..6121160
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimestampTest {
+ private static final GregorianCalendar EPOCH;
+ private static final GregorianCalendar EPOCH_PLUS_MILLIS;
+
+ private static final int NUM_MILLIS = 2000000000;
+ private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2;
+
+ static {
+ EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+
+ EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+ EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+ EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
+ }
+
+ @Test
+ public void testBuilder() {
+ Schema plain = Date.SCHEMA;
+ assertEquals(Date.LOGICAL_NAME, plain.name());
+ assertEquals(1, (Object) plain.version());
+ }
+
+ @Test
+ public void testFromLogical() {
+ assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH.getTime()));
+ assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH_PLUS_MILLIS.getTime()));
+ }
+
+ @Test(expected = DataException.class)
+ public void testFromLogicalInvalidSchema() {
+ Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), EPOCH.getTime());
+ }
+
+ @Test
+ public void testToLogical() {
+ assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 0L));
+ assertEquals(EPOCH_PLUS_MILLIS.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS));
+ }
+
+ @Test(expected = DataException.class)
+ public void testToLogicalInvalidSchema() {
+ Date.toLogical(Date.builder().name("invalid").build(), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
new file mode 100644
index 0000000..017b2d3
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+
+public class StringConverterTest {
+ private static final String TOPIC = "topic";
+ private static final String SAMPLE_STRING = "a string";
+
+ private StringConverter converter = new StringConverter();
+
+ @Test
+ public void testStringToBytes() throws UnsupportedEncodingException {
+ assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+ }
+
+ @Test
+ public void testNonStringToBytes() throws UnsupportedEncodingException {
+ assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+ }
+
+ @Test
+ public void testNullToBytes() {
+ assertEquals(null, converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null));
+ }
+
+ @Test
+ public void testToBytesIgnoresSchema() throws UnsupportedEncodingException {
+ assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, null, true));
+ }
+
+ @Test
+ public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException {
+ converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+ assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+ }
+
+ @Test
+ public void testBytesToString() {
+ SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes());
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(SAMPLE_STRING, data.value());
+ }
+
+ @Test
+ public void testBytesNullToString() {
+ SchemaAndValue data = converter.toConnectData(TOPIC, null);
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(null, data.value());
+ }
+
+ @Test
+ public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
+ converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+ SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(SAMPLE_STRING, data.value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
new file mode 100644
index 0000000..9f8338d
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorUtilsTest {
+
+ private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5);
+
+ @Test
+ public void testGroupPartitions() {
+
+ List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
+ assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
+
+ grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
+ assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped);
+
+ grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
+ assertEquals(Arrays.asList(Arrays.asList(1, 2),
+ Arrays.asList(3, 4),
+ Arrays.asList(5)), grouped);
+
+ grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
+ assertEquals(Arrays.asList(Arrays.asList(1),
+ Arrays.asList(2),
+ Arrays.asList(3),
+ Arrays.asList(4),
+ Arrays.asList(5)), grouped);
+
+ grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
+ assertEquals(Arrays.asList(Arrays.asList(1),
+ Arrays.asList(2),
+ Arrays.asList(3),
+ Arrays.asList(4),
+ Arrays.asList(5),
+ Collections.EMPTY_LIST,
+ Collections.EMPTY_LIST), grouped);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGroupPartitionsInvalidCount() {
+ ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
new file mode 100644
index 0000000..a73153f
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSinkConnector extends SinkConnector {
+ public static final String FILE_CONFIG = "file";
+
+ private String filename;
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ filename = props.get(FILE_CONFIG);
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return FileStreamSinkTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ ArrayList<Map<String, String>> configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ Map<String, String> config = new HashMap<>();
+ if (filename != null)
+ config.put(FILE_CONFIG, filename);
+ configs.add(config);
+ }
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do since FileStreamSinkConnector has no background monitoring.
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
new file mode 100644
index 0000000..83ba6d4
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * FileStreamSinkTask writes records to stdout or a file.
+ */
+public class FileStreamSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class);
+
+ private String filename;
+ private PrintStream outputStream;
+
+ public FileStreamSinkTask() {
+ }
+
+ // for testing
+ public FileStreamSinkTask(PrintStream outputStream) {
+ filename = null;
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public String version() {
+ return new FileStreamSinkConnector().version();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
+ if (filename == null) {
+ outputStream = System.out;
+ } else {
+ try {
+ outputStream = new PrintStream(new FileOutputStream(filename, true));
+ } catch (FileNotFoundException e) {
+ throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e);
+ }
+ }
+ }
+
+ @Override
+ public void put(Collection<SinkRecord> sinkRecords) {
+ for (SinkRecord record : sinkRecords) {
+ log.trace("Writing line to {}: {}", logFilename(), record.value());
+ outputStream.println(record.value());
+ }
+ }
+
+ @Override
+ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ log.trace("Flushing output stream for {}", logFilename());
+ outputStream.flush();
+ }
+
+ @Override
+ public void stop() {
+ if (outputStream != System.out)
+ outputStream.close();
+ }
+
+ private String logFilename() {
+ return filename == null ? "stdout" : filename;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
new file mode 100644
index 0000000..843e999
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Very simple connector that works with the console. This connector supports both source and
+ * sink modes via its 'mode' setting.
+ */
+public class FileStreamSourceConnector extends SourceConnector {
+ public static final String TOPIC_CONFIG = "topic";
+ public static final String FILE_CONFIG = "file";
+
+ private String filename;
+ private String topic;
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ filename = props.get(FILE_CONFIG);
+ topic = props.get(TOPIC_CONFIG);
+ if (topic == null || topic.isEmpty())
+ throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting");
+ if (topic.contains(","))
+ throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source.");
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return FileStreamSourceTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ ArrayList<Map<String, String>> configs = new ArrayList<>();
+ // Only one input stream makes sense.
+ Map<String, String> config = new HashMap<>();
+ if (filename != null)
+ config.put(FILE_CONFIG, filename);
+ config.put(TOPIC_CONFIG, topic);
+ configs.add(config);
+ return configs;
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do since FileStreamSourceConnector has no background monitoring.
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
new file mode 100644
index 0000000..f30b603
--- /dev/null
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * FileStreamSourceTask reads from stdin or a file.
+ */
+public class FileStreamSourceTask extends SourceTask {
+ private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
+ public static final String FILENAME_FIELD = "filename";
+ public static final String POSITION_FIELD = "position";
+ private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+
+ private String filename;
+ private InputStream stream;
+ private BufferedReader reader = null;
+ private char[] buffer = new char[1024];
+ private int offset = 0;
+ private String topic = null;
+
+ private Long streamOffset;
+
+ @Override
+ public String version() {
+ return new FileStreamSourceConnector().version();
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+ if (filename == null || filename.isEmpty()) {
+ stream = System.in;
+ // Tracking offset for stdin doesn't make sense
+ streamOffset = null;
+ reader = new BufferedReader(new InputStreamReader(stream));
+ }
+ topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+ if (topic == null)
+ throw new ConnectException("FileStreamSourceTask config missing topic setting");
+ }
+
+ @Override
+ public List<SourceRecord> poll() throws InterruptedException {
+ if (stream == null) {
+ try {
+ stream = new FileInputStream(filename);
+ Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+ if (offset != null) {
+ Object lastRecordedOffset = offset.get(POSITION_FIELD);
+ if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
+ throw new ConnectException("Offset position is the incorrect type");
+ if (lastRecordedOffset != null) {
+ log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
+ long skipLeft = (Long) lastRecordedOffset;
+ while (skipLeft > 0) {
+ try {
+ long skipped = stream.skip(skipLeft);
+ skipLeft -= skipped;
+ } catch (IOException e) {
+ log.error("Error while trying to seek to previous offset in file: ", e);
+ throw new ConnectException(e);
+ }
+ }
+ log.debug("Skipped to offset {}", lastRecordedOffset);
+ }
+ streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
+ } else {
+ streamOffset = 0L;
+ }
+ reader = new BufferedReader(new InputStreamReader(stream));
+ log.debug("Opened {} for reading", logFilename());
+ } catch (FileNotFoundException e) {
+ log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created");
+ synchronized (this) {
+ this.wait(1000);
+ }
+ return null;
+ }
+ }
+
+ // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
+ // Instead we have to manage splitting lines ourselves, using simple backoff when no new data
+ // is available.
+ try {
+ final BufferedReader readerCopy;
+ synchronized (this) {
+ readerCopy = reader;
+ }
+ if (readerCopy == null)
+ return null;
+
+ ArrayList<SourceRecord> records = null;
+
+ int nread = 0;
+ while (readerCopy.ready()) {
+ nread = readerCopy.read(buffer, offset, buffer.length - offset);
+ log.trace("Read {} bytes from {}", nread, logFilename());
+
+ if (nread > 0) {
+ offset += nread;
+ if (offset == buffer.length) {
+ char[] newbuf = new char[buffer.length * 2];
+ System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
+ buffer = newbuf;
+ }
+
+ String line;
+ do {
+ line = extractLine();
+ if (line != null) {
+ log.trace("Read a line from {}", logFilename());
+ if (records == null)
+ records = new ArrayList<>();
+ records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
+ }
+ new ArrayList<SourceRecord>();
+ } while (line != null);
+ }
+ }
+
+ if (nread <= 0)
+ synchronized (this) {
+ this.wait(1000);
+ }
+
+ return records;
+ } catch (IOException e) {
+ // Underlying stream was killed, probably as a result of calling stop. Allow to return
+ // null, and driving thread will handle any shutdown if necessary.
+ }
+ return null;
+ }
+
+ private String extractLine() {
+ int until = -1, newStart = -1;
+ for (int i = 0; i < offset; i++) {
+ if (buffer[i] == '\n') {
+ until = i;
+ newStart = i + 1;
+ break;
+ } else if (buffer[i] == '\r') {
+ // We need to check for \r\n, so we must skip this if we can't check the next char
+ if (i + 1 >= offset)
+ return null;
+
+ until = i;
+ newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
+ break;
+ }
+ }
+
+ if (until != -1) {
+ String result = new String(buffer, 0, until);
+ System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart);
+ offset = offset - newStart;
+ if (streamOffset != null)
+ streamOffset += newStart;
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void stop() {
+ log.trace("Stopping");
+ synchronized (this) {
+ try {
+ if (stream != null && stream != System.in) {
+ stream.close();
+ log.trace("Closed input stream");
+ }
+ } catch (IOException e) {
+ log.error("Failed to close FileStreamSourceTask stream: ", e);
+ }
+ this.notify();
+ }
+ }
+
+ private Map<String, String> offsetKey(String filename) {
+ return Collections.singletonMap(FILENAME_FIELD, filename);
+ }
+
+ private Map<String, Long> offsetValue(Long pos) {
+ return Collections.singletonMap(POSITION_FIELD, pos);
+ }
+
+ private String logFilename() {
+ return filename == null ? "stdin" : filename;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
new file mode 100644
index 0000000..5ed03f4
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkConnectorTest {
+
+ private static final String MULTIPLE_TOPICS = "test1,test2";
+ private static final String[] MULTIPLE_TOPICS_LIST
+ = MULTIPLE_TOPICS.split(",");
+ private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList(
+ new TopicPartition("test1", 1), new TopicPartition("test2", 2)
+ );
+ private static final String FILENAME = "/afilename";
+
+ private FileStreamSinkConnector connector;
+ private ConnectorContext ctx;
+ private Map<String, String> sinkProperties;
+
+ @Before
+ public void setup() {
+ connector = new FileStreamSinkConnector();
+ ctx = PowerMock.createMock(ConnectorContext.class);
+ connector.initialize(ctx);
+
+ sinkProperties = new HashMap<>();
+ sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
+ sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
+ }
+
+ @Test
+ public void testSinkTasks() {
+ PowerMock.replayAll();
+
+ connector.start(sinkProperties);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+ assertEquals(1, taskConfigs.size());
+ assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
+
+ taskConfigs = connector.taskConfigs(2);
+ assertEquals(2, taskConfigs.size());
+ for (int i = 0; i < 2; i++) {
+ assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
+ }
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testTaskClass() {
+ PowerMock.replayAll();
+
+ connector.start(sinkProperties);
+ assertEquals(FileStreamSinkTask.class, connector.taskClass());
+
+ PowerMock.verifyAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
new file mode 100644
index 0000000..754e7f5
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSinkTaskTest {
+
+ private FileStreamSinkTask task;
+ private ByteArrayOutputStream os;
+ private PrintStream printStream;
+
+ @Before
+ public void setup() {
+ os = new ByteArrayOutputStream();
+ printStream = new PrintStream(os);
+ task = new FileStreamSinkTask(printStream);
+ }
+
+ @Test
+ public void testPutFlush() {
+ HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+
+ // We do not call task.start() since it would override the output stream
+
+ task.put(Arrays.asList(
+ new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
+ ));
+ offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
+ task.flush(offsets);
+ assertEquals("line1\n", os.toString());
+
+ task.put(Arrays.asList(
+ new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
+ new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
+ ));
+ offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
+ offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
+ task.flush(offsets);
+ assertEquals("line1\nline2\nline3\n", os.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
new file mode 100644
index 0000000..80ff7f5
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class FileStreamSourceConnectorTest {
+
+ private static final String SINGLE_TOPIC = "test";
+ private static final String MULTIPLE_TOPICS = "test1,test2";
+ private static final String FILENAME = "/somefilename";
+
+ private FileStreamSourceConnector connector;
+ private ConnectorContext ctx;
+ private Map<String, String> sourceProperties;
+
+ @Before
+ public void setup() {
+ connector = new FileStreamSourceConnector();
+ ctx = PowerMock.createMock(ConnectorContext.class);
+ connector.initialize(ctx);
+
+ sourceProperties = new HashMap<>();
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC);
+ sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
+ }
+
+ @Test
+ public void testSourceTasks() {
+ PowerMock.replayAll();
+
+ connector.start(sourceProperties);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+ assertEquals(1, taskConfigs.size());
+ assertEquals(FILENAME,
+ taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+ assertEquals(SINGLE_TOPIC,
+ taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
+
+ // Should be able to return fewer than requested #
+ taskConfigs = connector.taskConfigs(2);
+ assertEquals(1, taskConfigs.size());
+ assertEquals(FILENAME,
+ taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+ assertEquals(SINGLE_TOPIC,
+ taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSourceTasksStdin() {
+ PowerMock.replayAll();
+
+ sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+ connector.start(sourceProperties);
+ List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+ assertEquals(1, taskConfigs.size());
+ assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testMultipleSourcesInvalid() {
+ sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS);
+ connector.start(sourceProperties);
+ }
+
+ @Test
+ public void testTaskClass() {
+ PowerMock.replayAll();
+
+ connector.start(sourceProperties);
+ assertEquals(FileStreamSourceTask.class, connector.taskClass());
+
+ PowerMock.verifyAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
new file mode 100644
index 0000000..3689313
--- /dev/null
+++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.file;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileStreamSourceTaskTest {
+
+ private static final String TOPIC = "test";
+
+ private File tempFile;
+ private Map<String, String> config;
+ private OffsetStorageReader offsetStorageReader;
+ private SourceTaskContext context;
+ private FileStreamSourceTask task;
+
+ private boolean verifyMocks = false;
+
+ @Before
+ public void setup() throws IOException {
+ tempFile = File.createTempFile("file-stream-source-task-test", null);
+ config = new HashMap<>();
+ config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath());
+ config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
+ task = new FileStreamSourceTask();
+ offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
+ context = PowerMock.createMock(SourceTaskContext.class);
+ task.initialize(context);
+ }
+
+ @After
+ public void teardown() {
+ tempFile.delete();
+
+ if (verifyMocks)
+ PowerMock.verifyAll();
+ }
+
+ private void replay() {
+ PowerMock.replayAll();
+ verifyMocks = true;
+ }
+
+ @Test
+ public void testNormalLifecycle() throws InterruptedException, IOException {
+ expectOffsetLookupReturnNone();
+ replay();
+
+ task.start(config);
+
+ FileOutputStream os = new FileOutputStream(tempFile);
+ assertEquals(null, task.poll());
+ os.write("partial line".getBytes());
+ os.flush();
+ assertEquals(null, task.poll());
+ os.write(" finished\n".getBytes());
+ os.flush();
+ List<SourceRecord> records = task.poll();
+ assertEquals(1, records.size());
+ assertEquals(TOPIC, records.get(0).topic());
+ assertEquals("partial line finished", records.get(0).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
+ assertEquals(null, task.poll());
+
+ // Different line endings, and make sure the final \r doesn't result in a line until we can
+ // read the subsequent byte.
+ os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
+ os.flush();
+ records = task.poll();
+ assertEquals(4, records.size());
+ assertEquals("line1", records.get(0).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
+ assertEquals("line2", records.get(1).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
+ assertEquals("line3", records.get(2).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
+ assertEquals("line4", records.get(3).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
+
+ os.write("subsequent text".getBytes());
+ os.flush();
+ records = task.poll();
+ assertEquals(1, records.size());
+ assertEquals("", records.get(0).value());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
+
+ task.stop();
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testMissingTopic() throws InterruptedException {
+ replay();
+
+ config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
+ task.start(config);
+ }
+
+ public void testInvalidFile() throws InterruptedException {
+ config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
+ task.start(config);
+ // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data.
+ for (int i = 0; i < 100; i++)
+ assertEquals(null, task.poll());
+ }
+
+
+ private void expectOffsetLookupReturnNone() {
+ EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+ EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
+ }
+}
\ No newline at end of file