You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:43 UTC
[24/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
new file mode 100644
index 0000000..1890062
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * SourceRecords are generated by SourceTasks and passed to Kafka Connect for storage in
+ * Kafka. In addition to the standard fields in {@link ConnectRecord} which specify where data is stored
+ * in Kafka, they also include a sourcePartition and sourceOffset.
+ * </p>
+ * <p>
+ * The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table
+ * name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used
+ * to resume consumption of data.
+ * </p>
+ * <p>
+ * These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector
+ * might specify the sourcePartition as a record containing { "db": "database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class SourceRecord extends ConnectRecord {
+ private final Map<String, ?> sourcePartition;
+ private final Map<String, ?> sourceOffset;
+
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+ String topic, Integer partition, Schema valueSchema, Object value) {
+ this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
+ }
+
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+ String topic, Schema valueSchema, Object value) {
+ this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
+ }
+
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+ String topic, Integer partition,
+ Schema keySchema, Object key, Schema valueSchema, Object value) {
+ super(topic, partition, keySchema, key, valueSchema, value);
+ this.sourcePartition = sourcePartition;
+ this.sourceOffset = sourceOffset;
+ }
+
+ public Map<String, ?> sourcePartition() {
+ return sourcePartition;
+ }
+
+ public Map<String, ?> sourceOffset() {
+ return sourceOffset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ if (!super.equals(o))
+ return false;
+
+ SourceRecord that = (SourceRecord) o;
+
+ if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
+ return false;
+ if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
+ result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "SourceRecord{" +
+ "sourcePartition=" + sourcePartition +
+ ", sourceOffset=" + sourceOffset +
+ "} " + super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
new file mode 100644
index 0000000..5110504
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.Task;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SourceTask is a Task that pulls records from another system for storage in Kafka.
+ */
+@InterfaceStability.Unstable
+public abstract class SourceTask implements Task {
+
+ protected SourceTaskContext context;
+
+ /**
+ * Initialize this SourceTask with the specified context object.
+ */
+ public void initialize(SourceTaskContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Start the Task. This should handle any configuration parsing and one-time setup of the task.
+ * @param props initial configuration
+ */
+ @Override
+ public abstract void start(Map<String, String> props);
+
+ /**
+ * Poll this SourceTask for new records. This method should block if no data is currently
+ * available.
+ *
+ * @return a list of source records
+ */
+ public abstract List<SourceRecord> poll() throws InterruptedException;
+
+ /**
+ * <p>
+ * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
+ * method should block until the commit is complete.
+ * </p>
+ * <p>
+ * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
+ * automatically. This hook is provided for systems that also need to store offsets internally
+ * in their own system.
+ * </p>
+ */
+ public void commit() throws InterruptedException {
+ // This space intentionally left blank.
+ }
+
+ /**
+ * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop
+ * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has
+ * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and
+ * {@link #commit()}.
+ *
+ * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method
+ * could set a flag that will force {@link #poll()} to exit immediately and invoke
+ * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests.
+ */
+ public abstract void stop();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
new file mode 100644
index 0000000..200fa5f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+/**
+ * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying
+ * runtime.
+ */
+@InterfaceStability.Unstable
+public interface SourceTaskContext {
+ /**
+ * Get the OffsetStorageReader for this SourceTask.
+ */
+ OffsetStorageReader offsetStorageReader();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
new file mode 100644
index 0000000..158ddb1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import java.util.Map;
+
+/**
+ * The Converter interface provides support for translating between Kafka Connect's runtime data format
+ * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization
+ * layer (e.g. JsonNode, GenericRecord, Message).
+ */
+@InterfaceStability.Unstable
+public interface Converter {
+
+ /**
+ * Configure this class.
+ * @param configs configs in key/value pairs
+ * @param isKey whether is for key or value
+ */
+ void configure(Map<String, ?> configs, boolean isKey);
+
+ /**
+ * Convert a Kafka Connect data object to a native object for serialization.
+ * @param topic the topic associated with the data
+ * @param schema the schema for the value
+ * @param value the value to convert
+ * @return
+ */
+ byte[] fromConnectData(String topic, Schema schema, Object value);
+
+ /**
+ * Convert a native object to a Kafka Connect data object.
+ * @param topic the topic associated with the data
+ * @param value the value to convert
+ * @return an object containing the {@link Schema} and the converted value
+ */
+ SchemaAndValue toConnectData(String topic, byte[] value);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
new file mode 100644
index 0000000..9307c23
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * <p>
+ * OffsetStorageReader provides access to the offset storage used by sources. This can be used by
+ * connectors to determine offsets to start consuming data from. This is most commonly used during
+ * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
+ * </p>
+ * <p>
+ * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by
+ * {@link org.apache.kafka.connect.data.Schema} other than Array, Map, and Struct.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface OffsetStorageReader {
+ /**
+ * Get the offset for the specified partition. If the data isn't already available locally, this
+ * gets it from the backing store, which may require some network round trips.
+ *
+ * @param partition object uniquely identifying the partition of data
+ * @return object uniquely identifying the offset in the partition of data
+ */
+ <T> Map<String, Object> offset(Map<String, T> partition);
+
+ /**
+ * <p>
+ * Get a set of offsets for the specified partition identifiers. This may be more efficient
+ * than calling {@link #offset(Map)} repeatedly.
+ * </p>
+ * <p>
+ * Note that when errors occur, this method omits the associated data and tries to return as
+ * many of the requested values as possible. This allows a task that's managing many partitions to
+ * still proceed with any available data. Therefore, implementations should take care to check
+ * that the data is actually available in the returned response. The only case when an
+ * exception will be thrown is if the entire request failed, e.g. because the underlying
+ * storage was unavailable.
+ * </p>
+ *
+ * @param partitions set of identifiers for partitions of data
+ * @return a map of partition identifiers to decoded offsets
+ */
+ <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
new file mode 100644
index 0000000..5859f18
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link Converter} implementation that only supports serializing to strings. When converting Kafka Connect data to bytes,
+ * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
+ * When converting from bytes to Kafka Connect format, the converter will only ever return an optional string schema and
+ * a string or null.
+ *
+ * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
+ * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
+ * setting.
+ */
+public class StringConverter implements Converter {
+ private final StringSerializer serializer = new StringSerializer();
+ private final StringDeserializer deserializer = new StringDeserializer();
+
+ public StringConverter() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ Map<String, Object> serializerConfigs = new HashMap<>();
+ serializerConfigs.putAll(configs);
+ Map<String, Object> deserializerConfigs = new HashMap<>();
+ deserializerConfigs.putAll(configs);
+
+ Object encodingValue = configs.get("converter.encoding");
+ if (encodingValue != null) {
+ serializerConfigs.put("serializer.encoding", encodingValue);
+ deserializerConfigs.put("deserializer.encoding", encodingValue);
+ }
+
+ serializer.configure(serializerConfigs, isKey);
+ deserializer.configure(deserializerConfigs, isKey);
+ }
+
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ try {
+ return serializer.serialize(topic, value == null ? null : value.toString());
+ } catch (SerializationException e) {
+ throw new DataException("Failed to serialize to a string: ", e);
+ }
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ try {
+ return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
+ } catch (SerializationException e) {
+ throw new DataException("Failed to deserialize string: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
new file mode 100644
index 0000000..35250eb
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities that connector implementations might find useful. Contains common building blocks
+ * for writing connectors.
+ */
+@InterfaceStability.Unstable
+public class ConnectorUtils {
+ /**
+ * Given a list of elements and a target number of groups, generates list of groups of
+ * elements to match the target number of groups, spreading them evenly among the groups.
+ * This generates groups with contiguous elements, which results in intuitive ordering if
+ * your elements are also ordered (e.g. alphabetical lists of table names if you sort
+ * table names alphabetically to generate the raw partitions) or can result in efficient
+ * partitioning if elements are sorted according to some criteria that affects performance
+ * (e.g. topic partitions with the same leader).
+ *
+ * @param elements list of elements to partition
+ * @param numGroups the number of output groups to generate.
+ */
+ public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) {
+ if (numGroups <= 0)
+ throw new IllegalArgumentException("Number of groups must be positive.");
+
+ List<List<T>> result = new ArrayList<>(numGroups);
+
+ // Each group has either n+1 or n raw partitions
+ int perGroup = elements.size() / numGroups;
+ int leftover = elements.size() - (numGroups * perGroup);
+
+ int assigned = 0;
+ for (int group = 0; group < numGroups; group++) {
+ int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
+ List<T> groupList = new ArrayList<>(numThisGroup);
+ for (int i = 0; i < numThisGroup; i++) {
+ groupList.add(elements.get(assigned));
+ assigned++;
+ }
+ result.add(groupList);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
new file mode 100644
index 0000000..7ea1de2
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorReconfigurationTest {
+
+ @Test
+ public void testDefaultReconfigure() throws Exception {
+ TestConnector conn = new TestConnector(false);
+ conn.reconfigure(Collections.<String, String>emptyMap());
+ assertEquals(conn.stopOrder, 0);
+ assertEquals(conn.configureOrder, 1);
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testReconfigureStopException() throws Exception {
+ TestConnector conn = new TestConnector(true);
+ conn.reconfigure(Collections.<String, String>emptyMap());
+ }
+
+ private static class TestConnector extends Connector {
+ private boolean stopException;
+ private int order = 0;
+ public int stopOrder = -1;
+ public int configureOrder = -1;
+
+ public TestConnector(boolean stopException) {
+ this.stopException = stopException;
+ }
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+ configureOrder = order++;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int count) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ stopOrder = order++;
+ if (stopException)
+ throw new ConnectException("error");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
new file mode 100644
index 0000000..4388ade
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectSchemaTest {
+ private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
+ private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("field", Schema.INT32_SCHEMA)
+ .build();
+ private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("first", Schema.INT32_SCHEMA)
+ .field("second", Schema.STRING_SCHEMA)
+ .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+ .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
+ .field("nested", FLAT_STRUCT_SCHEMA)
+ .build();
+ private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
+ .field("nested", FLAT_STRUCT_SCHEMA)
+ .build();
+
+ @Test
+ public void testFieldsOnStructSchema() {
+ Schema schema = SchemaBuilder.struct()
+ .field("foo", Schema.BOOLEAN_SCHEMA)
+ .field("bar", Schema.INT32_SCHEMA)
+ .build();
+
+ assertEquals(2, schema.fields().size());
+ // Validate field lookup by name
+ Field foo = schema.field("foo");
+ assertEquals(0, foo.index());
+ Field bar = schema.field("bar");
+ assertEquals(1, bar.index());
+ // Any other field name should fail
+ assertNull(schema.field("other"));
+ }
+
+
+ @Test(expected = DataException.class)
+ public void testFieldsOnlyValidForStructs() {
+ Schema.INT8_SCHEMA.fields();
+ }
+
+ @Test
+ public void testValidateValueMatchingType() {
+ ConnectSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
+ ConnectSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
+ ConnectSchema.validateValue(Schema.INT32_SCHEMA, 1);
+ ConnectSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
+ ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
+ ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
+ ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
+ ConnectSchema.validateValue(Schema.STRING_SCHEMA, "a string");
+ ConnectSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes());
+ ConnectSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes()));
+ ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3));
+ ConnectSchema.validateValue(
+ SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(),
+ Collections.singletonMap(1, "value")
+ );
+ // Struct tests the basic struct layout + complex field types + nested structs
+ Struct structValue = new Struct(STRUCT_SCHEMA)
+ .put("first", 1)
+ .put("second", "foo")
+ .put("array", Arrays.asList(1, 2, 3))
+ .put("map", Collections.singletonMap(1, "value"))
+ .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12));
+ ConnectSchema.validateValue(STRUCT_SCHEMA, structValue);
+ }
+
+ @Test
+ public void testValidateValueMatchingLogicalType() {
+ ConnectSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2));
+ ConnectSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
+ ConnectSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
+ ConnectSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
+ }
+
+ // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
+ // to only include a single test for each type
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt8() {
+ ConnectSchema.validateValue(Schema.INT8_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt16() {
+ ConnectSchema.validateValue(Schema.INT16_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt32() {
+ ConnectSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchInt64() {
+ ConnectSchema.validateValue(Schema.INT64_SCHEMA, 1);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchFloat() {
+ ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchDouble() {
+ ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchBoolean() {
+ ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchString() {
+ // CharSequence is a similar type (supertype of String), but we restrict to String.
+ CharBuffer cbuf = CharBuffer.wrap("abc");
+ ConnectSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchBytes() {
+ ConnectSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"});
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchArray() {
+ ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchArraySomeMatch() {
+ // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses
+ // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element
+ ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapKey() {
+ ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapValue() {
+ ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapSomeKeys() {
+ Map<Object, String> data = new HashMap<>();
+ data.put(1, "abc");
+ data.put("wrong", "it's as easy as one two three");
+ ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchMapSomeValues() {
+ Map<Integer, Object> data = new HashMap<>();
+ data.put(1, "abc");
+ data.put(2, "wrong".getBytes());
+ ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchStructWrongSchema() {
+ // Completely mismatching schemas
+ ConnectSchema.validateValue(
+ FLAT_STRUCT_SCHEMA,
+ new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)
+ );
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchStructWrongNestedSchema() {
+ // Top-level schema matches, but nested does not.
+ ConnectSchema.validateValue(
+ PARENT_STRUCT_SCHEMA,
+ new Struct(PARENT_STRUCT_SCHEMA)
+ .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1))
+ );
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchDecimal() {
+ ConnectSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchDate() {
+ ConnectSchema.validateValue(Date.SCHEMA, 1000L);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchTime() {
+ ConnectSchema.validateValue(Time.SCHEMA, 1000L);
+ }
+
+ @Test(expected = DataException.class)
+ public void testValidateValueMismatchTimestamp() {
+ ConnectSchema.validateValue(Timestamp.SCHEMA, 1000L);
+ }
+
+ @Test
+ public void testPrimitiveEquality() {
+ // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
+ ConnectSchema s1 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
+ ConnectSchema s2 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
+ ConnectSchema differentType = new ConnectSchema(Schema.Type.INT16, false, null, "name", 2, "doc");
+ ConnectSchema differentOptional = new ConnectSchema(Schema.Type.INT8, true, null, "name", 2, "doc");
+ ConnectSchema differentDefault = new ConnectSchema(Schema.Type.INT8, false, true, "name", 2, "doc");
+ ConnectSchema differentName = new ConnectSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc");
+ ConnectSchema differentVersion = new ConnectSchema(Schema.Type.INT8, false, null, "name", 4, "doc");
+ ConnectSchema differentDoc = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "other doc");
+ ConnectSchema differentParameters = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null);
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentType);
+ assertNotEquals(s1, differentOptional);
+ assertNotEquals(s1, differentDefault);
+ assertNotEquals(s1, differentName);
+ assertNotEquals(s1, differentVersion);
+ assertNotEquals(s1, differentDoc);
+ assertNotEquals(s1, differentParameters);
+ }
+
+ @Test
+ public void testArrayEquality() {
+ // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
+ // never reused to ensure we're actually checking equality
+ ConnectSchema s1 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
+ ConnectSchema s2 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
+ ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build());
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentValueSchema);
+ }
+
+ @Test
+ public void testMapEquality() {
+ // Same as testArrayEquality, but for both key and value schemas
+ ConnectSchema s1 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+ ConnectSchema s2 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
+ ConnectSchema differentKeySchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
+ ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentKeySchema);
+ assertNotEquals(s1, differentValueSchema);
+ }
+
+ @Test
+ public void testStructEquality() {
+ // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
+ // Field's equals() method to validate all variations in the list of fields will be checked
+ ConnectSchema s1 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+ ConnectSchema s2 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
+ ConnectSchema differentField = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
+ Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
+ new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
+
+ assertEquals(s1, s2);
+ assertNotEquals(s1, differentField);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
new file mode 100644
index 0000000..8d6bd5a
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class DateTest {
+ private static final GregorianCalendar EPOCH;
+ private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
+ private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT;
+ static {
+ EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 1);
+ EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+ EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
+ EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
+ }
+
+ @Test
+ public void testBuilder() {
+ Schema plain = Date.SCHEMA;
+ assertEquals(Date.LOGICAL_NAME, plain.name());
+ assertEquals(1, (Object) plain.version());
+ }
+
+ @Test
+ public void testFromLogical() {
+ assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime()));
+ assertEquals(10000, Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime()));
+ }
+
+ @Test(expected = DataException.class)
+ public void testFromLogicalInvalidSchema() {
+ Date.fromLogical(Date.builder().name("invalid").build(), EPOCH.getTime());
+ }
+
+ @Test(expected = DataException.class)
+ public void testFromLogicalInvalidHasTimeComponents() {
+ Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime());
+ }
+
+ @Test
+ public void testToLogical() {
+ assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0));
+ assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), Date.toLogical(Date.SCHEMA, 10000));
+ }
+
+ @Test(expected = DataException.class)
+ public void testToLogicalInvalidSchema() {
+ Date.toLogical(Date.builder().name("invalid").build(), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
new file mode 100644
index 0000000..27f570a
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class DecimalTest {
+ private static final int TEST_SCALE = 2;
+ private static final BigDecimal TEST_DECIMAL = new BigDecimal(new BigInteger("156"), TEST_SCALE);
+ private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new BigInteger("-156"), TEST_SCALE);
+ private static final byte[] TEST_BYTES = new byte[]{0, -100};
+ private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100};
+
+ @Test
+ public void testBuilder() {
+ Schema plain = Decimal.builder(2).build();
+ assertEquals(Decimal.LOGICAL_NAME, plain.name());
+ assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), plain.parameters());
+ assertEquals(1, (Object) plain.version());
+ }
+
+ @Test
+ public void testFromLogical() {
+ Schema schema = Decimal.schema(TEST_SCALE);
+ byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL);
+ assertArrayEquals(TEST_BYTES, encoded);
+
+ encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE);
+ assertArrayEquals(TEST_BYTES_NEGATIVE, encoded);
+ }
+
+ @Test
+ public void testToLogical() {
+ Schema schema = Decimal.schema(2);
+ BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES);
+ assertEquals(TEST_DECIMAL, converted);
+
+ converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE);
+ assertEquals(TEST_DECIMAL_NEGATIVE, converted);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
new file mode 100644
index 0000000..e7b3a9d
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FieldTest {
+
+ @Test
+ public void testEquality() {
+ Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
+ Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
+ Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
+ Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
+ Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
+
+ assertEquals(field1, field2);
+ assertNotEquals(field1, differentName);
+ assertNotEquals(field1, differentIndex);
+ assertNotEquals(field1, differentSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
new file mode 100644
index 0000000..62020f3
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.SchemaBuilderException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SchemaBuilderTest {
+ private static final String NAME = "name";
+ private static final Integer VERSION = 2;
+ private static final String DOC = "doc";
+ private static final Map<String, String> NO_PARAMS = null;
+
+ @Test
+ public void testInt8Builder() {
+ Schema schema = SchemaBuilder.int8().build();
+ assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt8BuilderInvalidDefault() {
+ SchemaBuilder.int8().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt16Builder() {
+ Schema schema = SchemaBuilder.int16().build();
+ assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt16BuilderInvalidDefault() {
+ SchemaBuilder.int16().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt32Builder() {
+ Schema schema = SchemaBuilder.int32().build();
+ assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt32BuilderInvalidDefault() {
+ SchemaBuilder.int32().defaultValue("invalid");
+ }
+
+ @Test
+ public void testInt64Builder() {
+ Schema schema = SchemaBuilder.int64().build();
+ assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testInt64BuilderInvalidDefault() {
+ SchemaBuilder.int64().defaultValue("invalid");
+ }
+
+ @Test
+ public void testFloatBuilder() {
+ Schema schema = SchemaBuilder.float32().build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testFloatBuilderInvalidDefault() {
+ SchemaBuilder.float32().defaultValue("invalid");
+ }
+
+ @Test
+ public void testDoubleBuilder() {
+ Schema schema = SchemaBuilder.float64().build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testDoubleBuilderInvalidDefault() {
+ SchemaBuilder.float64().defaultValue("invalid");
+ }
+
+ @Test
+ public void testBooleanBuilder() {
+ Schema schema = SchemaBuilder.bool().build();
+ assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testBooleanBuilderInvalidDefault() {
+ SchemaBuilder.bool().defaultValue("invalid");
+ }
+
+ @Test
+ public void testStringBuilder() {
+ Schema schema = SchemaBuilder.string().build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string")
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string");
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testStringBuilderInvalidDefault() {
+ SchemaBuilder.string().defaultValue(true);
+ }
+
+ @Test
+ public void testBytesBuilder() {
+ Schema schema = SchemaBuilder.bytes().build();
+ assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
+ assertNoMetadata(schema);
+
+ schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes())
+ .version(VERSION).doc(DOC).build();
+ assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes());
+ assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testBytesBuilderInvalidDefault() {
+ SchemaBuilder.bytes().defaultValue("a string, not bytes");
+ }
+
+
+ @Test
+ public void testParameters() {
+ Map<String, String> expectedParameters = new HashMap<>();
+ expectedParameters.put("foo", "val");
+ expectedParameters.put("bar", "baz");
+
+ Schema schema = SchemaBuilder.string().parameter("foo", "val").parameter("bar", "baz").build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+ assertMetadata(schema, null, null, null, expectedParameters);
+
+ schema = SchemaBuilder.string().parameters(expectedParameters).build();
+ assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+ assertMetadata(schema, null, null, null, expectedParameters);
+ }
+
+
+ @Test
+ public void testStructBuilder() {
+ Schema schema = SchemaBuilder.struct()
+ .field("field1", Schema.INT8_SCHEMA)
+ .field("field2", Schema.INT8_SCHEMA)
+ .build();
+ assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
+ assertEquals(2, schema.fields().size());
+ assertEquals("field1", schema.fields().get(0).name());
+ assertEquals(0, schema.fields().get(0).index());
+ assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
+ assertEquals("field2", schema.fields().get(1).name());
+ assertEquals(1, schema.fields().get(1).index());
+ assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testNonStructCantHaveFields() {
+ SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
+ }
+
+
+ @Test
+ public void testArrayBuilder() {
+ Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+ assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+
+ // Default value
+ List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
+ schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
+ assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testArrayBuilderInvalidDefault() {
+ // Array, but wrong embedded type
+ SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
+ }
+
+ @Test
+ public void testMapBuilder() {
+ Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build();
+ assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
+ assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+
+ // Default value
+ Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
+ schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+ .defaultValue(defMap).build();
+ assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
+ assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+ assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+ assertNoMetadata(schema);
+ }
+
+ @Test(expected = SchemaBuilderException.class)
+ public void testMapBuilderInvalidDefault() {
+ // Map, but wrong embedded type
+ Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
+ SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+ .defaultValue(defMap).build();
+ }
+
+
+
+ private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) {
+ assertEquals(type, schema.type());
+ assertEquals(optional, schema.isOptional());
+ if (type == Schema.Type.BYTES) {
+ // byte[] is not comparable, need to wrap to check correctly
+ if (defaultValue == null)
+ assertNull(schema.defaultValue());
+ else
+ assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue()));
+ } else {
+ assertEquals(defaultValue, schema.defaultValue());
+ }
+ }
+
+ private void assertMetadata(Schema schema, String name, Integer version, String doc, Map<String, String> parameters) {
+ assertEquals(name, schema.name());
+ assertEquals(version, schema.version());
+ assertEquals(doc, schema.doc());
+ assertEquals(parameters, schema.parameters());
+ }
+
+ private void assertNoMetadata(Schema schema) {
+ assertMetadata(schema, null, null, null, null);
+ }
+}