You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:32 UTC
[13/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
deleted file mode 100644
index 104abf1..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class CopycatSchema implements Schema {
- /**
- * Maps Schema.Types to a list of Java classes that can be used to represent them.
- */
- private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
- /**
- * Maps known logical types to a list of Java classes that can be used to represent them.
- */
- private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new HashMap<>();
-
- /**
- * Maps the Java classes to the corresponding Schema.Type.
- */
- private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
-
- static {
- SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
- SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class));
- SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class));
- SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
- SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class));
- SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class));
- SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class));
- SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class));
- // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and
- // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause
- // those methods to fail, so ByteBuffers are recommended
- SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
- SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
- SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
- SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class));
-
- for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
- for (Class<?> schemaClass : schemaClasses.getValue())
- JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
- }
-
- LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class));
- LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
- LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
- LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
- // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for
- // schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so
- // they should not be used without schemas.
- }
-
- // The type of the field
- private final Type type;
- private final boolean optional;
- private final Object defaultValue;
-
- private final List<Field> fields;
- private final Map<String, Field> fieldsByName;
-
- private final Schema keySchema;
- private final Schema valueSchema;
-
- // Optional name and version provide a built-in way to indicate what type of data is included. Most
- // useful for structs to indicate the semantics of the struct and map it to some existing underlying
- // serializer-specific schema. However, can also be useful in specifying other logical types (e.g. a set is an array
- // with additional constraints).
- private final String name;
- private final Integer version;
- // Optional human readable documentation describing this schema.
- private final String doc;
- private final Map<String, String> parameters;
-
- /**
- * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead.
- */
- public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, Map<String, String> parameters, List<Field> fields, Schema keySchema, Schema valueSchema) {
- this.type = type;
- this.optional = optional;
- this.defaultValue = defaultValue;
- this.name = name;
- this.version = version;
- this.doc = doc;
- this.parameters = parameters;
-
- this.fields = fields;
- if (this.fields != null && this.type == Type.STRUCT) {
- this.fieldsByName = new HashMap<>();
- for (Field field : fields)
- fieldsByName.put(field.name(), field);
- } else {
- this.fieldsByName = null;
- }
-
- this.keySchema = keySchema;
- this.valueSchema = valueSchema;
- }
-
- /**
- * Construct a Schema for a primitive type, setting schema parameters, struct fields, and key and value schemas to null.
- */
- public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc) {
- this(type, optional, defaultValue, name, version, doc, null, null, null, null);
- }
-
- /**
- * Construct a default schema for a primitive type. The schema is required, has no default value, name, version,
- * or documentation.
- */
- public CopycatSchema(Type type) {
- this(type, false, null, null, null, null);
- }
-
- @Override
- public Type type() {
- return type;
- }
-
- @Override
- public boolean isOptional() {
- return optional;
- }
-
- @Override
- public Object defaultValue() {
- return defaultValue;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Integer version() {
- return version;
- }
-
- @Override
- public String doc() {
- return doc;
- }
-
- @Override
- public Map<String, String> parameters() {
- return parameters;
- }
-
- @Override
- public List<Field> fields() {
- if (type != Type.STRUCT)
- throw new DataException("Cannot list fields on non-struct type");
- return fields;
- }
-
- public Field field(String fieldName) {
- if (type != Type.STRUCT)
- throw new DataException("Cannot look up fields on non-struct type");
- return fieldsByName.get(fieldName);
- }
-
- @Override
- public Schema keySchema() {
- if (type != Type.MAP)
- throw new DataException("Cannot look up key schema on non-map type");
- return keySchema;
- }
-
- @Override
- public Schema valueSchema() {
- if (type != Type.MAP && type != Type.ARRAY)
- throw new DataException("Cannot look up value schema on non-array and non-map type");
- return valueSchema;
- }
-
-
-
- /**
- * Validate that the value can be used with the schema, i.e. that its type matches the schema type and nullability
- * requirements. Throws a DataException if the value is invalid.
- * @param schema Schema to test
- * @param value value to test
- */
- public static void validateValue(Schema schema, Object value) {
- if (value == null) {
- if (!schema.isOptional())
- throw new DataException("Invalid value: null used for required field");
- else
- return;
- }
-
- List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
-
- if (expectedClasses == null)
- expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
-
- if (expectedClasses == null)
- throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
-
- boolean foundMatch = false;
- for (Class<?> expectedClass : expectedClasses) {
- if (expectedClass.isInstance(value)) {
- foundMatch = true;
- break;
- }
- }
- if (!foundMatch)
- throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
-
- switch (schema.type()) {
- case STRUCT:
- Struct struct = (Struct) value;
- if (!struct.schema().equals(schema))
- throw new DataException("Struct schemas do not match.");
- struct.validate();
- break;
- case ARRAY:
- List<?> array = (List<?>) value;
- for (Object entry : array)
- validateValue(schema.valueSchema(), entry);
- break;
- case MAP:
- Map<?, ?> map = (Map<?, ?>) value;
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- validateValue(schema.keySchema(), entry.getKey());
- validateValue(schema.valueSchema(), entry.getValue());
- }
- break;
- }
- }
-
- /**
- * Validate that the value can be used for this schema, i.e. that its type matches the schema type and optional
- * requirements. Throws a DataException if the value is invalid.
- * @param value the value to validate
- */
- public void validateValue(Object value) {
- validateValue(this, value);
- }
-
- @Override
- public CopycatSchema schema() {
- return this;
- }
-
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CopycatSchema schema = (CopycatSchema) o;
- return Objects.equals(optional, schema.optional) &&
- Objects.equals(type, schema.type) &&
- Objects.equals(defaultValue, schema.defaultValue) &&
- Objects.equals(fields, schema.fields) &&
- Objects.equals(keySchema, schema.keySchema) &&
- Objects.equals(valueSchema, schema.valueSchema) &&
- Objects.equals(name, schema.name) &&
- Objects.equals(version, schema.version) &&
- Objects.equals(doc, schema.doc) &&
- Objects.equals(parameters, schema.parameters);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc, parameters);
- }
-
- @Override
- public String toString() {
- if (name != null)
- return "Schema{" + name + ":" + type + "}";
- else
- return "Schema{" + type + "}";
- }
-
-
- /**
- * Get the {@link Type} associated with the the given class.
- *
- * @param klass the Class to
- * @return the corresponding type, nor null if there is no matching type
- */
- public static Type schemaType(Class<?> klass) {
- synchronized (JAVA_CLASS_SCHEMA_TYPES) {
- Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
- if (schemaType != null)
- return schemaType;
-
- // Since the lookup only checks the class, we need to also try
- for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
- try {
- klass.asSubclass(entry.getKey());
- // Cache this for subsequent lookups
- JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
- return entry.getValue();
- } catch (ClassCastException e) {
- // Expected, ignore
- }
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
deleted file mode 100644
index 4e14659..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * <p>
- * A date representing a calendar day with no time of day or timezone. The corresponding Java type is a java.util.Date
- * with hours, minutes, seconds, milliseconds set to 0. The underlying representation is an integer representing the
- * number of standardized days (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 seconds/minute,
- * 1000 milliseconds/second with n) since Unix epoch.
- * </p>
- */
-public class Date {
- public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Date";
-
- private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
-
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
- /**
- * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you can override additional schema settings such
- * as required/optional, default value, and documentation.
- * @return a SchemaBuilder
- */
- public static SchemaBuilder builder() {
- return SchemaBuilder.int32()
- .name(LOGICAL_NAME)
- .version(1);
- }
-
- public static final Schema SCHEMA = builder().schema();
-
- /**
- * Convert a value from its logical format (Date) to it's encoded format.
- * @param value the logical value
- * @return the encoded value
- */
- public static int fromLogical(Schema schema, java.util.Date value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Date object but the schema does not match.");
- Calendar calendar = Calendar.getInstance(UTC);
- calendar.setTime(value);
- if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 ||
- calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) {
- throw new DataException("Copycat Date type should not have any time fields set to non-zero values.");
- }
- long unixMillis = calendar.getTimeInMillis();
- return (int) (unixMillis / MILLIS_PER_DAY);
- }
-
- public static java.util.Date toLogical(Schema schema, int value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Date object but the schema does not match.");
- return new java.util.Date(value * MILLIS_PER_DAY);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
deleted file mode 100644
index f23e13e..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-
-/**
- * <p>
- * An arbitrary-precision signed decimal number. The value is unscaled * 10 ^ -scale where:
- * <ul>
- * <li>unscaled is an integer </li>
- * <li>scale is an integer representing how many digits the decimal point should be shifted on the unscaled value</li>
- * </ul>
- * </p>
- * <p>
- * Decimal does not provide a fixed schema because it is parameterized by the scale, which is fixed on the schema
- * rather than being part of the value.
- * </p>
- * <p>
- * The underlying representation of this type is bytes containing a two's complement integer
- * </p>
- */
-public class Decimal {
- public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Decimal";
- public static final String SCALE_FIELD = "scale";
-
- /**
- * Returns a SchemaBuilder for a Decimal with the given scale factor. By returning a SchemaBuilder you can override
- * additional schema settings such as required/optional, default value, and documentation.
- * @param scale the scale factor to apply to unscaled values
- * @return a SchemaBuilder
- */
- public static SchemaBuilder builder(int scale) {
- return SchemaBuilder.bytes()
- .name(LOGICAL_NAME)
- .parameter(SCALE_FIELD, ((Integer) scale).toString())
- .version(1);
- }
-
- public static Schema schema(int scale) {
- return builder(scale).build();
- }
-
- /**
- * Convert a value from its logical format (BigDecimal) to it's encoded format.
- * @param value the logical value
- * @return the encoded value
- */
- public static byte[] fromLogical(Schema schema, BigDecimal value) {
- if (value.scale() != scale(schema))
- throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
- return value.unscaledValue().toByteArray();
- }
-
- public static BigDecimal toLogical(Schema schema, byte[] value) {
- return new BigDecimal(new BigInteger(value), scale(schema));
- }
-
- private static int scale(Schema schema) {
- String scaleString = schema.parameters().get(SCALE_FIELD);
- if (scaleString == null)
- throw new DataException("Invalid Decimal schema: scale parameter not found.");
- try {
- return Integer.parseInt(scaleString);
- } catch (NumberFormatException e) {
- throw new DataException("Invalid scale parameter found in Decimal schema: ", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
deleted file mode 100644
index c71cdb4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.Objects;
-
-/**
- * <p>
- * A field in a {@link Struct}, consisting of a field name, index, and {@link Schema} for the field value.
- * </p>
- */
-public class Field {
- private final String name;
- private final int index;
- private final Schema schema;
-
- public Field(String name, int index, Schema schema) {
- this.name = name;
- this.index = index;
- this.schema = schema;
- }
-
- /**
- * Get the name of this field.
- * @return the name of this field
- */
- public String name() {
- return name;
- }
-
-
- /**
- * Get the index of this field within the struct.
- * @return the index of this field
- */
- public int index() {
- return index;
- }
-
- /**
- * Get the schema of this field
- * @return the schema of values of this field
- */
- public Schema schema() {
- return schema;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Field field = (Field) o;
- return Objects.equals(index, field.index) &&
- Objects.equals(name, field.name) &&
- Objects.equals(schema, field.schema);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, index, schema);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
deleted file mode 100644
index 3db01ae..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- * Definition of an abstract data type. Data types can be primitive types (integer types, floating point types,
- * boolean, strings, and bytes) or complex types (typed arrays, maps with one key schema and value schema,
- * and structs that have a fixed set of field names each with an associated value schema). Any type can be specified
- * as optional, allowing it to be omitted (resulting in null values when it is missing) and can specify a default
- * value.
- * </p>
- * <p>
- * All schemas may have some associated metadata: a name, version, and documentation. These are all considered part
- * of the schema itself and included when comparing schemas. Besides adding important metadata, these fields enable
- * the specification of logical types that specify additional constraints and semantics (e.g. UNIX timestamps are
- * just an int64, but the user needs the know about the additional semantics to interpret it properly).
- * </p>
- * <p>
- * Schemas can be created directly, but in most cases using {@link SchemaBuilder} will be simpler.
- * </p>
- */
-public interface Schema {
- /**
- * The type of a schema. These only include the core types; logical types must be determined by checking the schema name.
- */
- enum Type {
- INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT;
-
- private String name;
-
- Type() {
- this.name = this.name().toLowerCase();
- }
-
- public String getName() {
- return name;
- }
-
- public boolean isPrimitive() {
- switch (this) {
- case INT8:
- case INT16:
- case INT32:
- case INT64:
- case FLOAT32:
- case FLOAT64:
- case BOOLEAN:
- case STRING:
- case BYTES:
- return true;
- }
- return false;
- }
- }
-
-
- Schema INT8_SCHEMA = SchemaBuilder.int8().build();
- Schema INT16_SCHEMA = SchemaBuilder.int16().build();
- Schema INT32_SCHEMA = SchemaBuilder.int32().build();
- Schema INT64_SCHEMA = SchemaBuilder.int64().build();
- Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
- Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
- Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
- Schema STRING_SCHEMA = SchemaBuilder.string().build();
- Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
-
- Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build();
- Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build();
- Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build();
- Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build();
- Schema OPTIONAL_FLOAT32_SCHEMA = SchemaBuilder.float32().optional().build();
- Schema OPTIONAL_FLOAT64_SCHEMA = SchemaBuilder.float64().optional().build();
- Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build();
- Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
- Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
-
- /**
- * @return the type of this schema
- */
- Type type();
-
- /**
- * @return true if this field is optional, false otherwise
- */
- boolean isOptional();
-
- /**
- * @return the default value for this schema
- */
- Object defaultValue();
-
- /**
- * @return the name of this schema
- */
- String name();
-
- /**
- * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones.
- * @return the version of this schema
- */
- Integer version();
-
- /**
- * @return the documentation for this schema
- */
- String doc();
-
- /**
- * Get a map of schema parameters.
- * @return Map containing parameters for this schema, or null if there are no parameters
- */
- Map<String, String> parameters();
-
- /**
- * Get the key schema for this map schema. Throws a DataException if this schema is not a map.
- * @return the key schema
- */
- Schema keySchema();
-
- /**
- * Get the value schema for this map or array schema. Throws a DataException if this schema is not a map or array.
- * @return the value schema
- */
- Schema valueSchema();
-
- /**
- * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
- * @return the list of fields for this Schema
- */
- List<Field> fields();
-
- /**
- * Get a field for this Schema by name. Throws a DataException if this schema is not a struct.
- * @param fieldName the name of the field to look up
- * @return the Field object for the specified field, or null if there is no field with the given name
- */
- Field field(String fieldName);
-
- /**
- * Return a concrete instance of the {@link Schema}
- * @return the {@link Schema}
- */
- Schema schema();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
deleted file mode 100644
index 368a8cf..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.Objects;
-
-public class SchemaAndValue {
- private final Schema schema;
- private final Object value;
-
- public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
-
- public SchemaAndValue(Schema schema, Object value) {
- this.value = value;
- this.schema = schema;
- }
-
- public Schema schema() {
- return schema;
- }
-
- public Object value() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- SchemaAndValue that = (SchemaAndValue) o;
- return Objects.equals(schema, that.schema) &&
- Objects.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(schema, value);
- }
-
- @Override
- public String toString() {
- return "SchemaAndValue{" +
- "schema=" + schema +
- ", value=" + value +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
deleted file mode 100644
index 21ae54c..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.apache.kafka.copycat.errors.SchemaBuilderException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- * SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the
- * properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types
- * are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline.
- * </p>
- * <p>
- * Here is an example of building a struct schema:
- * <pre>
- * Schema dateSchema = SchemaBuilder.struct()
- * .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.")
- * .field("month", Schema.STRING_SCHEMA)
- * .field("day", Schema.INT8_SCHEMA)
- * .field("year", Schema.INT16_SCHEMA)
- * .build();
- * </pre>
- * </p>
- * <p>
- * Here is an example of using a second SchemaBuilder to construct complex, nested types:
- * <pre>
- * Schema userListSchema = SchemaBuilder.array(
- * SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
- * ).build();
- * </pre>
- * </p>
- */
-public class SchemaBuilder implements Schema {
- private static final String TYPE_FIELD = "type";
- private static final String OPTIONAL_FIELD = "optional";
- private static final String DEFAULT_FIELD = "default";
- private static final String NAME_FIELD = "name";
- private static final String VERSION_FIELD = "version";
- private static final String DOC_FIELD = "doc";
-
-
- private final Type type;
- private Boolean optional = null;
- private Object defaultValue = null;
-
- private List<Field> fields = null;
- private Schema keySchema = null;
- private Schema valueSchema = null;
-
- private String name;
- private Integer version;
- // Optional human readable documentation describing this schema.
- private String doc;
- // Additional parameters for logical types.
- private Map<String, String> parameters;
-
- private SchemaBuilder(Type type) {
- this.type = type;
- }
-
- // Common/metadata fields
-
- @Override
- public boolean isOptional() {
- return optional == null ? false : optional;
- }
-
- /**
- * Set this schema as optional.
- * @return the SchemaBuilder
- */
- public SchemaBuilder optional() {
- checkNull(OPTIONAL_FIELD, optional);
- optional = true;
- return this;
- }
-
- /**
- * Set this schema as required. This is the default, but this method can be used to make this choice explicit.
- * @return the SchemaBuilder
- */
- public SchemaBuilder required() {
- checkNull(OPTIONAL_FIELD, optional);
- optional = false;
- return this;
- }
-
- @Override
- public Object defaultValue() {
- return defaultValue;
- }
-
- /**
- * Set the default value for this schema. The value is validated against the schema type, throwing a
- * {@link SchemaBuilderException} if it does not match.
- * @param value the default value
- * @return the SchemaBuilder
- */
- public SchemaBuilder defaultValue(Object value) {
- checkNull(DEFAULT_FIELD, defaultValue);
- checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
- try {
- CopycatSchema.validateValue(this, value);
- } catch (DataException e) {
- throw new SchemaBuilderException("Invalid default value", e);
- }
- defaultValue = value;
- return this;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- /**
- * Set the name of this schema.
- * @param name the schema name
- * @return the SchemaBuilder
- */
- public SchemaBuilder name(String name) {
- checkNull(NAME_FIELD, this.name);
- this.name = name;
- return this;
- }
-
- @Override
- public Integer version() {
- return version;
- }
-
- /**
- * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is
- * newer and which is older by their ordering.
- * @param version the schema version
- * @return the SchemaBuilder
- */
- public SchemaBuilder version(Integer version) {
- checkNull(VERSION_FIELD, this.version);
- this.version = version;
- return this;
- }
-
- @Override
- public String doc() {
- return doc;
- }
-
- /**
- * Set the documentation for this schema.
- * @param doc the documentation
- * @return the SchemaBuilder
- */
- public SchemaBuilder doc(String doc) {
- checkNull(DOC_FIELD, this.doc);
- this.doc = doc;
- return this;
- }
-
- @Override
- public Map<String, String> parameters() {
- return Collections.unmodifiableMap(parameters);
- }
-
- /**
- * Set a schema parameter.
- * @param propertyName name of the schema property to define
- * @param propertyValue value of the schema property to define, as a String
- * @return the SchemaBuilder
- */
- public SchemaBuilder parameter(String propertyName, String propertyValue) {
- // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types
- // can print their properties in a consistent order.
- if (parameters == null)
- parameters = new LinkedHashMap<>();
- parameters.put(propertyName, propertyValue);
- return this;
- }
-
- /**
- * Set schema parameters. This operation is additive; it does not remove existing parameters that do not appear in
- * the set of properties pass to this method.
- * @param props Map of properties to set
- * @return the SchemaBuilder
- */
- public SchemaBuilder parameters(Map<String, String> props) {
- // Avoid creating an empty set of properties so we never have an empty map
- if (props.isEmpty())
- return this;
- if (parameters == null)
- parameters = new LinkedHashMap<>();
- parameters.putAll(props);
- return this;
- }
-
- @Override
- public Type type() {
- return type;
- }
-
- /**
- * Create a SchemaBuilder for the specified type.
- *
- * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form
- * can be useful when generating schemas dynamically.
- *
- * @param type the schema type
- * @return a new SchemaBuilder
- */
- public static SchemaBuilder type(Type type) {
- return new SchemaBuilder(type);
- }
-
- // Primitive types
-
- /**
- * @return a new {@link Type#INT8} SchemaBuilder
- */
- public static SchemaBuilder int8() {
- return new SchemaBuilder(Type.INT8);
- }
-
- /**
- * @return a new {@link Type#INT16} SchemaBuilder
- */
- public static SchemaBuilder int16() {
- return new SchemaBuilder(Type.INT16);
- }
-
- /**
- * @return a new {@link Type#INT32} SchemaBuilder
- */
- public static SchemaBuilder int32() {
- return new SchemaBuilder(Type.INT32);
- }
-
- /**
- * @return a new {@link Type#INT64} SchemaBuilder
- */
- public static SchemaBuilder int64() {
- return new SchemaBuilder(Type.INT64);
- }
-
- /**
- * @return a new {@link Type#FLOAT32} SchemaBuilder
- */
- public static SchemaBuilder float32() {
- return new SchemaBuilder(Type.FLOAT32);
- }
-
- /**
- * @return a new {@link Type#FLOAT64} SchemaBuilder
- */
- public static SchemaBuilder float64() {
- return new SchemaBuilder(Type.FLOAT64);
- }
-
- /**
- * @return a new {@link Type#BOOLEAN} SchemaBuilder
- */
- public static SchemaBuilder bool() {
- return new SchemaBuilder(Type.BOOLEAN);
- }
-
- /**
- * @return a new {@link Type#STRING} SchemaBuilder
- */
- public static SchemaBuilder string() {
- return new SchemaBuilder(Type.STRING);
- }
-
- /**
- * @return a new {@link Type#BYTES} SchemaBuilder
- */
- public static SchemaBuilder bytes() {
- return new SchemaBuilder(Type.BYTES);
- }
-
-
- // Structs
-
- /**
- * @return a new {@link Type#STRUCT} SchemaBuilder
- */
- public static SchemaBuilder struct() {
- return new SchemaBuilder(Type.STRUCT);
- }
-
- /**
- * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema.
- * @param fieldName the name of the field to add
- * @param fieldSchema the Schema for the field's value
- * @return the SchemaBuilder
- */
- public SchemaBuilder field(String fieldName, Schema fieldSchema) {
- if (type != Type.STRUCT)
- throw new SchemaBuilderException("Cannot create fields on type " + type);
- if (fields == null)
- fields = new ArrayList<>();
- int fieldIndex = fields.size();
- fields.add(new Field(fieldName, fieldIndex, fieldSchema));
- return this;
- }
-
- /**
- * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
- * @return the list of fields for this Schema
- */
- public List<Field> fields() {
- if (type != Type.STRUCT)
- throw new DataException("Cannot list fields on non-struct type");
- return fields;
- }
-
- public Field field(String fieldName) {
- if (type != Type.STRUCT)
- throw new DataException("Cannot look up fields on non-struct type");
- for (Field field : fields)
- if (field.name() == fieldName)
- return field;
- return null;
- }
-
-
-
- // Maps & Arrays
-
- /**
- * @param valueSchema the schema for elements of the array
- * @return a new {@link Type#ARRAY} SchemaBuilder
- */
- public static SchemaBuilder array(Schema valueSchema) {
- SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
- builder.valueSchema = valueSchema;
- return builder;
- }
-
- /**
- * @param keySchema the schema for keys in the map
- * @param valueSchema the schema for values in the map
- * @return a new {@link Type#MAP} SchemaBuilder
- */
- public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
- SchemaBuilder builder = new SchemaBuilder(Type.MAP);
- builder.keySchema = keySchema;
- builder.valueSchema = valueSchema;
- return builder;
- }
-
- @Override
- public Schema keySchema() {
- return keySchema;
- }
-
- @Override
- public Schema valueSchema() {
- return valueSchema;
- }
-
-
- /**
- * Build the Schema using the current settings
- * @return the {@link Schema}
- */
- public Schema build() {
- return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc,
- parameters == null ? null : Collections.unmodifiableMap(parameters),
- fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
- }
-
- /**
- * Return a concrete instance of the {@link Schema} specified by this builder
- * @return the {@link Schema}
- */
- @Override
- public Schema schema() {
- return build();
- }
-
-
- private static void checkNull(String fieldName, Object val) {
- if (val != null)
- throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set.");
- }
-
- private static void checkNotNull(String fieldName, Object val, String fieldToSet) {
- if (val == null)
- throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
deleted file mode 100644
index 3ab9e7f..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.data.Schema.Type;
-import org.apache.kafka.copycat.errors.SchemaProjectorException;
-
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * <p>
- * SchemaProjector is utility to project a value between compatible schemas and throw exceptions
- * when non compatible schemas are provided.
- * </p>
- */
-
-public class SchemaProjector {
-
- private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable = new HashSet<>();
-
- static {
- Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64};
- for (int i = 0; i < promotableTypes.length; ++i) {
- for (int j = i; j < promotableTypes.length; ++j) {
- promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j]));
- }
- }
- }
-
- /**
- * This method project a value between compatible schemas and throw exceptions when non compatible schemas are provided
- * @param source the schema used to construct the record
- * @param record the value to project from source schema to target schema
- * @param target the schema to project the record to
- * @return the projected value with target schema
- * @throws SchemaProjectorException
- */
- public static Object project(Schema source, Object record, Schema target) throws SchemaProjectorException {
- checkMaybeCompatible(source, target);
- if (source.isOptional() && !target.isOptional()) {
- if (target.defaultValue() != null) {
- if (record != null) {
- return projectRequiredSchema(source, record, target);
- } else {
- return target.defaultValue();
- }
- } else {
- throw new SchemaProjectorException("Writer schema is optional, however, target schema does not provide a default value.");
- }
- } else {
- if (record != null) {
- return projectRequiredSchema(source, record, target);
- } else {
- return null;
- }
- }
- }
-
- private static Object projectRequiredSchema(Schema source, Object record, Schema target) throws SchemaProjectorException {
- switch (target.type()) {
- case INT8:
- case INT16:
- case INT32:
- case INT64:
- case FLOAT32:
- case FLOAT64:
- case BOOLEAN:
- case BYTES:
- case STRING:
- return projectPrimitive(source, record, target);
- case STRUCT:
- return projectStruct(source, (Struct) record, target);
- case ARRAY:
- return projectArray(source, record, target);
- case MAP:
- return projectMap(source, record, target);
- }
- return null;
- }
-
- private static Object projectStruct(Schema source, Struct sourceStruct, Schema target) throws SchemaProjectorException {
- Struct targetStruct = new Struct(target);
- for (Field targetField : target.fields()) {
- String fieldName = targetField.name();
- Field sourceField = source.field(fieldName);
- if (sourceField != null) {
- Object sourceFieldValue = sourceStruct.get(fieldName);
- try {
- Object targetFieldValue = project(sourceField.schema(), sourceFieldValue, targetField.schema());
- targetStruct.put(fieldName, targetFieldValue);
- } catch (SchemaProjectorException e) {
- throw new SchemaProjectorException("Error projecting " + sourceField.name(), e);
- }
- } else {
- Object targetDefault;
- if (targetField.schema().defaultValue() != null) {
- targetDefault = targetField.schema().defaultValue();
- } else {
- throw new SchemaProjectorException("Cannot project " + source.schema() + " to " + target.schema());
- }
- targetStruct.put(fieldName, targetDefault);
- }
- }
- return targetStruct;
- }
-
-
- private static void checkMaybeCompatible(Schema source, Schema target) {
- if (source.type() != target.type() && !isPromotable(source.type(), target.type())) {
- throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type() + " and target type: " + target.type());
- } else if (!Objects.equals(source.name(), target.name())) {
- throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name() + " and target name: " + target.name());
- } else if (!Objects.equals(source.parameters(), target.parameters())) {
- throw new SchemaProjectorException("Schema parameters not equal. source parameters: " + source.parameters() + " and target parameters: " + target.parameters());
- }
- }
-
- private static Object projectArray(Schema source, Object record, Schema target) throws SchemaProjectorException {
- List<?> array = (List<?>) record;
- List<Object> retArray = new ArrayList<>();
- for (Object entry : array) {
- retArray.add(project(source.valueSchema(), entry, target.valueSchema()));
- }
- return retArray;
- }
-
- private static Object projectMap(Schema source, Object record, Schema target) throws SchemaProjectorException {
- Map<?, ?> map = (Map<?, ?>) record;
- Map<Object, Object> retMap = new HashMap<>();
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- Object key = entry.getKey();
- Object value = entry.getValue();
- Object retKey = project(source.keySchema(), key, target.keySchema());
- Object retValue = project(source.valueSchema(), value, target.valueSchema());
- retMap.put(retKey, retValue);
- }
- return retMap;
- }
-
- private static Object projectPrimitive(Schema source, Object record, Schema target) throws SchemaProjectorException {
- assert source.type().isPrimitive();
- assert target.type().isPrimitive();
- Object result;
- if (isPromotable(source.type(), target.type())) {
- Number numberRecord = (Number) record;
- switch (target.type()) {
- case INT8:
- result = numberRecord.byteValue();
- break;
- case INT16:
- result = numberRecord.shortValue();
- break;
- case INT32:
- result = numberRecord.intValue();
- break;
- case INT64:
- result = numberRecord.longValue();
- break;
- case FLOAT32:
- result = numberRecord.floatValue();
- break;
- case FLOAT64:
- result = numberRecord.doubleValue();
- break;
- default:
- throw new SchemaProjectorException("Not promotable type.");
- }
- } else {
- result = record;
- }
- return result;
- }
-
- private static boolean isPromotable(Type sourceType, Type targetType) {
- return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
deleted file mode 100644
index bd757c4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * <p>
- * A structured record containing a set of named fields with values, each field using an independent {@link Schema}.
- * Struct objects must specify a complete {@link Schema} up front, and only fields specified in the Schema may be set.
- * </p>
- * <p>
- * The Struct's {@link #put(String, Object)} method returns the Struct itself to provide a fluent API for constructing
- * complete objects:
- * <pre>
- * Schema schema = SchemaBuilder.struct().name("com.example.Person")
- * .field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build()
- * Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21)
- * </pre>
- * </p>
- */
-public class Struct {
-
- private final Schema schema;
- private final Object[] values;
-
- /**
- * Create a new Struct for this {@link Schema}
- * @param schema the {@link Schema} for the Struct
- */
- public Struct(Schema schema) {
- if (schema.type() != Schema.Type.STRUCT)
- throw new DataException("Not a struct schema: " + schema);
- this.schema = schema;
- this.values = new Object[schema.fields().size()];
- }
-
- /**
- * Get the schema for this Struct.
- * @return the Struct's schema
- */
- public Schema schema() {
- return schema;
- }
-
- /**
- * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
- * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
- * must be cast to a more specific type.
- * @param fieldName the field name to lookup
- * @return the value for the field
- */
- public Object get(String fieldName) {
- Field field = lookupField(fieldName);
- return get(field);
- }
-
- /**
- * Get the value of a field, returning the default value if no value has been set yet and a default value is specified
- * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and
- * must be cast to a more specific type.
- * @param field the field to lookup
- * @return the value for the field
- */
- public Object get(Field field) {
- Object val = values[field.index()];
- if (val == null && schema.defaultValue() != null) {
- val = schema.defaultValue();
- }
- return val;
- }
-
- /**
- * Get the underlying raw value for the field without accounting for default values.
- * @param fieldName the field to get the value of
- * @return the raw value
- */
- public Object getWithoutDefault(String fieldName) {
- Field field = lookupField(fieldName);
- return values[field.index()];
- }
-
- // Note that all getters have to have boxed return types since the fields might be optional
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Byte.
- */
- public Byte getInt8(String fieldName) {
- return (Byte) getCheckType(fieldName, Schema.Type.INT8);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Short.
- */
- public Short getInt16(String fieldName) {
- return (Short) getCheckType(fieldName, Schema.Type.INT16);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Integer.
- */
- public Integer getInt32(String fieldName) {
- return (Integer) getCheckType(fieldName, Schema.Type.INT32);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Long.
- */
- public Long getInt64(String fieldName) {
- return (Long) getCheckType(fieldName, Schema.Type.INT64);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Float.
- */
- public Float getFloat32(String fieldName) {
- return (Float) getCheckType(fieldName, Schema.Type.FLOAT32);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Double.
- */
- public Double getFloat64(String fieldName) {
- return (Double) getCheckType(fieldName, Schema.Type.FLOAT64);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Boolean.
- */
- public Boolean getBoolean(String fieldName) {
- return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a String.
- */
- public String getString(String fieldName) {
- return (String) getCheckType(fieldName, Schema.Type.STRING);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a byte[].
- */
- public byte[] getBytes(String fieldName) {
- Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
- if (bytes instanceof ByteBuffer)
- return ((ByteBuffer) bytes).array();
- return (byte[]) bytes;
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a List.
- */
- public <T> List<T> getArray(String fieldName) {
- return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Map.
- */
- public <K, V> Map<K, V> getMap(String fieldName) {
- return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
- }
-
- /**
- * Equivalent to calling {@link #get(String)} and casting the result to a Struct.
- */
- public Struct getStruct(String fieldName) {
- return (Struct) getCheckType(fieldName, Schema.Type.STRUCT);
- }
-
- /**
- * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
- * {@link Schema}.
- * @param fieldName the name of the field to set
- * @param value the value of the field
- * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
- */
- public Struct put(String fieldName, Object value) {
- Field field = lookupField(fieldName);
- return put(field, value);
- }
-
- /**
- * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's
- * {@link Schema}.
- * @param field the field to set
- * @param value the value of the field
- * @return the Struct, to allow chaining of {@link #put(String, Object)} calls
- */
- public Struct put(Field field, Object value) {
- CopycatSchema.validateValue(field.schema(), value);
- values[field.index()] = value;
- return this;
- }
-
-
- /**
- * Validates that this struct has filled in all the necessary data with valid values. For required fields
- * without defaults, this validates that a value has been set and has matching types/schemas. If any validation
- * fails, throws a DataException.
- */
- public void validate() {
- for (Field field : schema.fields()) {
- Schema fieldSchema = field.schema();
- Object value = values[field.index()];
- if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null))
- continue;
- CopycatSchema.validateValue(fieldSchema, value);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Struct struct = (Struct) o;
- return Objects.equals(schema, struct.schema) &&
- Arrays.equals(values, struct.values);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(schema, Arrays.hashCode(values));
- }
-
- private Field lookupField(String fieldName) {
- Field field = schema.field(fieldName);
- if (field == null)
- throw new DataException(fieldName + " is not a valid field name");
- return field;
- }
-
- // Get the field's value, but also check that the field matches the specified type, throwing an exception if it doesn't.
- // Used to implement the get*() methods that return typed data instead of Object
- private Object getCheckType(String fieldName, Schema.Type type) {
- Field field = lookupField(fieldName);
- if (field.schema().type() != type)
- throw new DataException("Field '" + fieldName + "' is not of type " + type);
- return values[field.index()];
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
deleted file mode 100644
index e3255e0..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * <p>
- * A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a
- * java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a
- * point in time during the first day after the Unix epoch. The underlying representation is an integer
- * representing the number of milliseconds after midnight.
- * </p>
- */
-public class Time {
- public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Time";
-
- private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
-
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
- /**
- * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such
- * as required/optional, default value, and documentation.
- * @return a SchemaBuilder
- */
- public static SchemaBuilder builder() {
- return SchemaBuilder.int32()
- .name(LOGICAL_NAME)
- .version(1);
- }
-
- public static final Schema SCHEMA = builder().schema();
-
- /**
- * Convert a value from its logical format (Time) to it's encoded format.
- * @param value the logical value
- * @return the encoded value
- */
- public static int fromLogical(Schema schema, java.util.Date value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Time object but the schema does not match.");
- Calendar calendar = Calendar.getInstance(UTC);
- calendar.setTime(value);
- long unixMillis = calendar.getTimeInMillis();
- if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
- throw new DataException("Copycat Time type should not have any date fields set to non-zero values.");
- }
- return (int) unixMillis;
- }
-
- public static java.util.Date toLogical(Schema schema, int value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Date object but the schema does not match.");
- if (value < 0 || value > MILLIS_PER_DAY)
- throw new DataException("Time values must use number of milliseconds greater than 0 and less than 86400000");
- return new java.util.Date(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
deleted file mode 100644
index 62d371c..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.TimeZone;
-
-/**
- * <p>
- * A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
- * java.util.Date. The underlying representation is a long representing the number of milliseconds since Unix epoch.
- * </p>
- */
-public class Timestamp {
- public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Timestamp";
-
- private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
- /**
- * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
- * as required/optional, default value, and documentation.
- * @return a SchemaBuilder
- */
- public static SchemaBuilder builder() {
- return SchemaBuilder.int64()
- .name(LOGICAL_NAME)
- .version(1);
- }
-
- public static final Schema SCHEMA = builder().schema();
-
- /**
- * Convert a value from its logical format (Date) to it's encoded format.
- * @param value the logical value
- * @return the encoded value
- */
- public static long fromLogical(Schema schema, java.util.Date value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
- return value.getTime();
- }
-
- public static java.util.Date toLogical(Schema schema, long value) {
- if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
- throw new DataException("Requested conversion of Timestamp object but the schema does not match.");
- return new java.util.Date(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
deleted file mode 100644
index c8f1bad..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-/**
- * CopycatException is the top-level exception type generated by Copycat and connectors.
- */
-@InterfaceStability.Unstable
-public class CopycatException extends KafkaException {
-
- public CopycatException(String s) {
- super(s);
- }
-
- public CopycatException(String s, Throwable throwable) {
- super(s, throwable);
- }
-
- public CopycatException(Throwable throwable) {
- super(throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
deleted file mode 100644
index 11139a4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Base class for all Copycat data API exceptions.
- */
-public class DataException extends CopycatException {
- public DataException(String s) {
- super(s);
- }
-
- public DataException(String s, Throwable throwable) {
- super(s, throwable);
- }
-
- public DataException(Throwable throwable) {
- super(throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
deleted file mode 100644
index 6f9f233..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Indicates that a method has been invoked illegally or at an invalid time by a connector or task.
- */
-public class IllegalWorkerStateException extends CopycatException {
- public IllegalWorkerStateException(String s) {
- super(s);
- }
-
- public IllegalWorkerStateException(String s, Throwable throwable) {
- super(s, throwable);
- }
-
- public IllegalWorkerStateException(Throwable throwable) {
- super(throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
deleted file mode 100644
index b5a93af..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-public class SchemaBuilderException extends DataException {
- public SchemaBuilderException(String s) {
- super(s);
- }
-
- public SchemaBuilderException(String s, Throwable throwable) {
- super(s, throwable);
- }
-
- public SchemaBuilderException(Throwable throwable) {
- super(throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
deleted file mode 100644
index be21418..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- **/
-package org.apache.kafka.copycat.errors;
-
-public class SchemaProjectorException extends DataException {
- public SchemaProjectorException(String s) {
- super(s);
- }
-
- public SchemaProjectorException(String s, Throwable throwable) {
- super(s, throwable);
- }
-
- public SchemaProjectorException(Throwable throwable) {
- super(throwable);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
deleted file mode 100644
index fb2e694..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Connector;
-
-/**
- * SinkConnectors implement the Connector interface to send Kafka data to another system.
- */
-@InterfaceStability.Unstable
-public abstract class SinkConnector extends Connector {
-
- /**
- * <p>
- * Configuration key for the list of input topics for this connector.
- * </p>
- * <p>
- * Usually this setting is only relevant to the Copycat framework, but is provided here for
- * the convenience of Connector developers if they also need to know the set of topics.
- * </p>
- */
- public static final String TOPICS_CONFIG = "topics";
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
deleted file mode 100644
index 79ac725..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.CopycatRecord;
-import org.apache.kafka.copycat.data.Schema;
-
-/**
- * SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields. This information
- * should be used by the SinkTask to coordinate kafkaOffset commits.
- */
-@InterfaceStability.Unstable
-public class SinkRecord extends CopycatRecord {
- private final long kafkaOffset;
-
- public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) {
- super(topic, partition, keySchema, key, valueSchema, value);
- this.kafkaOffset = kafkaOffset;
- }
-
- public long kafkaOffset() {
- return kafkaOffset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- if (!super.equals(o))
- return false;
-
- SinkRecord that = (SinkRecord) o;
-
- if (kafkaOffset != that.kafkaOffset)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "SinkRecord{" +
- "kafkaOffset=" + kafkaOffset +
- "} " + super.toString();
- }
-}