You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/31 21:26:25 UTC
[1/3] kafka git commit: KAFKA-2475: Make Copycat only have a
Converter class instead of Serializer, Deserializer, and Converter.
Repository: kafka
Updated Branches:
refs/heads/trunk 9c936b186 -> 3803e5cb3
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py
index 344f7ef..b4adf53 100644
--- a/tests/kafkatest/tests/copycat_test.py
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -15,8 +15,10 @@
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.copycat import CopycatStandaloneService
+from kafkatest.services.console_consumer import ConsoleConsumer
from ducktape.utils.util import wait_until
-import hashlib, subprocess
+from ducktape.mark import parametrize
+import hashlib, subprocess, json
class CopycatStandaloneFileTest(KafkaTest):
"""
@@ -30,8 +32,14 @@ class CopycatStandaloneFileTest(KafkaTest):
OFFSETS_FILE = "/mnt/copycat.offsets"
- FIRST_INPUT = "foo\nbar\nbaz\n"
- SECOND_INPUT = "razz\nma\ntazz\n"
+ TOPIC = "test"
+
+ FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+ FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
+ SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+ SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+ SCHEMA = { "type": "string", "optional": False }
def __init__(self, test_context):
super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
@@ -40,8 +48,18 @@ class CopycatStandaloneFileTest(KafkaTest):
self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+ self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+
+ @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True)
+ @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False)
+ @parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None)
+ def test_file_source_and_sink(self, converter="org.apache.kafka.json.JsonConverter", schemas=True):
+ assert converter != None, "converter type must be set"
+ # Template parameters
+ self.key_converter = converter
+ self.value_converter = converter
+ self.schemas = schemas
- def test_file_source_and_sink(self):
# These need to be set
self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties"))
self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties"))
@@ -61,6 +79,13 @@ class CopycatStandaloneFileTest(KafkaTest):
self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+ # Validate the format of the data in the Kafka topic
+ self.consumer_validator.run()
+ expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
+ decoder = (json.loads if converter.endswith("JsonConverter") else str)
+ actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
+ assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
+
def validate_output(self, value):
try:
output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-sink.properties b/tests/kafkatest/tests/templates/copycat-file-sink.properties
index c7865a6..77c43c7 100644
--- a/tests/kafkatest/tests/templates/copycat-file-sink.properties
+++ b/tests/kafkatest/tests/templates/copycat-file-sink.properties
@@ -17,4 +17,4 @@ name=local-file-sink
connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
tasks.max=1
file={{ OUTPUT_FILE }}
-topics=test
\ No newline at end of file
+topics={{ TOPIC }}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-file-source.properties b/tests/kafkatest/tests/templates/copycat-file-source.properties
index 8612ed7..68dabc2 100644
--- a/tests/kafkatest/tests/templates/copycat-file-source.properties
+++ b/tests/kafkatest/tests/templates/copycat-file-source.properties
@@ -17,4 +17,4 @@ name=local-file-source
connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
tasks.max=1
file={{ INPUT_FILE }}
-topic=test
\ No newline at end of file
+topic={{ TOPIC }}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
index 5ffb487..39db6ce 100644
--- a/tests/kafkatest/tests/templates/copycat-standalone.properties
+++ b/tests/kafkatest/tests/templates/copycat-standalone.properties
@@ -15,11 +15,18 @@
bootstrap.servers={{ kafka.bootstrap_servers() }}
-key.converter=org.apache.kafka.copycat.json.JsonConverter
-value.converter=org.apache.kafka.copycat.json.JsonConverter
-key.serializer=org.apache.kafka.copycat.json.JsonSerializer
-value.serializer=org.apache.kafka.copycat.json.JsonSerializer
-key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
-value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
offset.storage.file.filename={{ OFFSETS_FILE }}
[3/3] kafka git commit: KAFKA-2475: Make Copycat only have a
Converter class instead of Serializer, Deserializer, and Converter.
Posted by gw...@apache.org.
KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
The Converter class now translates directly between byte[] and Copycat's data
API instead of requiring an intermediate runtime type like Avro's GenericRecord
or Jackson's JsonNode.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #172 from ewencp/kafka-2475-unified-serializer-converter and squashes the following commits:
566c52f [Ewen Cheslack-Postava] Checkstyle fixes
320d0df [Ewen Cheslack-Postava] Restrict offset format.
85797e7 [Ewen Cheslack-Postava] Add StringConverter for using Copycat with raw strings.
698d65c [Ewen Cheslack-Postava] Move and update outdated comment about handing of types for BYTES type in Copycat.
4bed051 [Ewen Cheslack-Postava] KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3803e5cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3803e5cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3803e5cb
Branch: refs/heads/trunk
Commit: 3803e5cb37cb602ff9eab5562ff8db3a2dd79b45
Parents: 9c936b1
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Aug 31 12:26:16 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Aug 31 12:26:16 2015 -0700
----------------------------------------------------------------------
.../kafka/common/config/AbstractConfig.java | 15 +
.../kafka/common/config/AbstractConfigTest.java | 13 +
config/copycat-standalone.properties | 17 +-
.../kafka/copycat/data/CopycatSchema.java | 92 ++++--
.../org/apache/kafka/copycat/data/Schema.java | 1 -
.../kafka/copycat/data/SchemaBuilder.java | 3 +-
.../kafka/copycat/source/SourceRecord.java | 41 +--
.../apache/kafka/copycat/storage/Converter.java | 23 +-
.../copycat/storage/OffsetStorageReader.java | 13 +-
.../kafka/copycat/storage/StringConverter.java | 81 ++++++
.../copycat/storage/StringConverterTest.java | 83 ++++++
.../copycat/file/FileStreamSourceTask.java | 33 ++-
.../copycat/file/FileStreamSourceTaskTest.java | 23 +-
.../kafka/copycat/json/JsonConverter.java | 281 +++++++++++++------
.../kafka/copycat/json/JsonDeserializer.java | 31 +-
.../kafka/copycat/json/JsonSerializer.java | 15 -
.../kafka/copycat/json/JsonConverterTest.java | 243 ++++++++++++----
.../apache/kafka/copycat/cli/WorkerConfig.java | 35 +--
.../apache/kafka/copycat/runtime/Worker.java | 72 ++---
.../kafka/copycat/runtime/WorkerSinkTask.java | 30 +-
.../kafka/copycat/runtime/WorkerSourceTask.java | 39 ++-
.../storage/OffsetStorageReaderImpl.java | 39 ++-
.../copycat/storage/OffsetStorageWriter.java | 37 ++-
.../kafka/copycat/storage/OffsetUtils.java | 46 +++
.../copycat/runtime/WorkerSinkTaskTest.java | 26 +-
.../copycat/runtime/WorkerSourceTaskTest.java | 60 ++--
.../kafka/copycat/runtime/WorkerTest.java | 18 +-
.../storage/OffsetStorageWriterTest.java | 42 +--
tests/kafkatest/tests/copycat_test.py | 33 ++-
.../templates/copycat-file-sink.properties | 2 +-
.../templates/copycat-file-source.properties | 2 +-
.../templates/copycat-standalone.properties | 19 +-
32 files changed, 991 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 774701a..12a1927 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -115,6 +115,21 @@ public class AbstractConfig {
return copy;
}
+ /**
+ * Gets all original settings with the given prefix, stripping the prefix before adding it to the output.
+ *
+ * @param prefix the prefix to use as a filter
+ * @return a Map containing the settings with the prefix
+ */
+ public Map<String, Object> originalsWithPrefix(String prefix) {
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (Map.Entry<String, ?> entry : originals.entrySet()) {
+ if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
+ result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+ }
+ return result;
+ }
+
public Map<String, ?> values() {
return new HashMap<String, Object>(values);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index db1b0ee..28064ec 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -17,10 +17,12 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.junit.Test;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
public class AbstractConfigTest {
@@ -35,6 +37,17 @@ public class AbstractConfigTest {
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
}
+ @Test
+ public void testOriginalsWithPrefix() {
+ Properties props = new Properties();
+ props.put("foo.bar", "abc");
+ props.put("setting", "def");
+ TestConfig config = new TestConfig(props);
+ Map<String, Object> expected = new HashMap<>();
+ expected.put("bar", "abc");
+ assertEquals(expected, config.originalsWithPrefix("foo."));
+ }
+
private void testValidInputs(String configValue) {
Properties props = new Properties();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties
index cf3b268..fd264b5 100644
--- a/config/copycat-standalone.properties
+++ b/config/copycat-standalone.properties
@@ -16,12 +16,21 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
+# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will
+# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
-key.serializer=org.apache.kafka.copycat.json.JsonSerializer
-value.serializer=org.apache.kafka.copycat.json.JsonSerializer
-key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
-value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
+# it to
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+
+# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
+# Offset data is never visible outside of Copcyat.
+offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
+offset.key.converter.schemas.enable=false
+offset.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
index c823f28..809496a 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -23,20 +23,37 @@ import java.nio.ByteBuffer;
import java.util.*;
public class CopycatSchema implements Schema {
- private static final Map<Type, Class<?>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+ /**
+ * Maps Schema.Types to a list of Java classes that can be used to represent them.
+ */
+ private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+
+ /**
+ * Maps the Java classes to the corresponding Schema.Type.
+ */
+ private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
+
static {
- SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class);
- SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class);
- SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class);
- SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class);
- SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class);
- SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class);
- SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class);
- SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class);
- SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class);
- SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class);
- SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class);
- // Bytes are handled as a special case
+ SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
+ SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class));
+ SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class));
+ SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
+ SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class));
+ SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class));
+ SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class));
+ SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class));
+ // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and
+ // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause
+ // those methods to fail, so ByteBuffers are recommended
+ SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
+ SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
+ SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
+ SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class));
+
+ for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
+ for (Class<?> schemaClass : schemaClasses.getValue())
+ JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
+ }
}
// The type of the field
@@ -158,14 +175,19 @@ public class CopycatSchema implements Schema {
return;
}
- // Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want
- // objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so
- // ByteBuffers are recommended
- if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer))
- return;
- Class<?> expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type());
- if (expectedClass == null || !expectedClass.isInstance(value))
- throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass());
+ final List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+ if (expectedClasses == null)
+ throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
+
+ boolean foundMatch = false;
+ for (Class<?> expectedClass : expectedClasses) {
+ if (expectedClass.isInstance(value)) {
+ foundMatch = true;
+ break;
+ }
+ }
+ if (!foundMatch)
+ throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
switch (schema.type()) {
case STRUCT:
@@ -232,4 +254,32 @@ public class CopycatSchema implements Schema {
else
return "Schema{" + type + "}";
}
+
+
+ /**
+ * Get the {@link Type} associated with the the given class.
+ *
+ * @param klass the Class to
+ * @return the corresponding type, nor null if there is no matching type
+ */
+ public static Type schemaType(Class<?> klass) {
+ synchronized (JAVA_CLASS_SCHEMA_TYPES) {
+ Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
+ if (schemaType != null)
+ return schemaType;
+
+ // Since the lookup only checks the class, we need to also try
+ for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
+ try {
+ klass.asSubclass(entry.getKey());
+ // Cache this for subsequent lookups
+ JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
+ return entry.getValue();
+ } catch (ClassCastException e) {
+ // Expected, ignore
+ }
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
index 5ceb57d..4ece21d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
@@ -92,7 +92,6 @@ public interface Schema {
Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
-
/**
* @return the type of this schema
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
index fe9d474..d9c149d 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.copycat.errors.DataException;
import org.apache.kafka.copycat.errors.SchemaBuilderException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -346,7 +347,7 @@ public class SchemaBuilder implements Schema {
* @return the {@link Schema}
*/
public Schema build() {
- return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema);
+ return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
index 05286a1..7f54c10 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.connector.CopycatRecord;
import org.apache.kafka.copycat.data.Schema;
+import java.util.Map;
+
/**
* <p>
* SourceRecords are generated by SourceTasks and passed to Copycat for storage in
@@ -41,47 +43,32 @@ import org.apache.kafka.copycat.data.Schema;
*/
@InterfaceStability.Unstable
public class SourceRecord extends CopycatRecord {
- private final Schema sourcePartitionSchema;
- private final Object sourcePartition;
- private final Schema sourceOffsetSchema;
- private final Object sourceOffset;
+ private final Map<String, ?> sourcePartition;
+ private final Map<String, ?> sourceOffset;
- public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
- Schema sourceOffsetSchema, Object sourceOffset,
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition, Schema valueSchema, Object value) {
- this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value);
+ this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value);
}
- public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
- Schema sourceOffsetSchema, Object sourceOffset,
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Schema valueSchema, Object value) {
- this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value);
+ this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value);
}
- public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition,
- Schema sourceOffsetSchema, Object sourceOffset,
+ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition,
Schema keySchema, Object key, Schema valueSchema, Object value) {
super(topic, partition, keySchema, key, valueSchema, value);
- this.sourcePartitionSchema = sourcePartitionSchema;
this.sourcePartition = sourcePartition;
- this.sourceOffsetSchema = sourceOffsetSchema;
this.sourceOffset = sourceOffset;
}
- public Schema sourcePartitionSchema() {
- return sourcePartitionSchema;
- }
-
- public Object sourcePartition() {
+ public Map<String, ?> sourcePartition() {
return sourcePartition;
}
- public Schema sourceOffsetSchema() {
- return sourceOffsetSchema;
- }
-
- public Object sourceOffset() {
+ public Map<String, ?> sourceOffset() {
return sourceOffset;
}
@@ -96,10 +83,6 @@ public class SourceRecord extends CopycatRecord {
SourceRecord that = (SourceRecord) o;
- if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null)
- return false;
- if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null)
- return false;
if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
return false;
if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null)
@@ -111,8 +94,6 @@ public class SourceRecord extends CopycatRecord {
@Override
public int hashCode() {
int result = super.hashCode();
- result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0);
- result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0);
result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0);
result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0);
return result;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
index dd2068d..d51b789 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
@@ -21,28 +21,37 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
+import java.util.Map;
+
/**
* The Converter interface provides support for translating between Copycat's runtime data format
- * and the "native" runtime format used by the serialization layer. This is used to translate
- * two types of data: records and offsets. The (de)serialization is performed by a separate
- * component -- the producer or consumer serializer or deserializer for records or a Copycat
- * serializer or deserializer for offsets.
+ * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization
+ * layer (e.g. JsonNode, GenericRecord, Message).
*/
@InterfaceStability.Unstable
-public interface Converter<T> {
+public interface Converter {
+
+ /**
+ * Configure this class.
+ * @param configs configs in key/value pairs
+ * @param isKey whether is for key or value
+ */
+ void configure(Map<String, ?> configs, boolean isKey);
/**
* Convert a Copycat data object to a native object for serialization.
+ * @param topic the topic associated with the data
* @param schema the schema for the value
* @param value the value to convert
* @return
*/
- T fromCopycatData(Schema schema, Object value);
+ byte[] fromCopycatData(String topic, Schema schema, Object value);
/**
* Convert a native object to a Copycat data object.
+ * @param topic the topic associated with the data
* @param value the value to convert
* @return an object containing the {@link Schema} and the converted value
*/
- SchemaAndValue toCopycatData(T value);
+ SchemaAndValue toCopycatData(String topic, byte[] value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
index b51fbde..95d2c04 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
@@ -18,15 +18,20 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import java.util.Collection;
import java.util.Map;
/**
+ * <p>
* OffsetStorageReader provides access to the offset storage used by sources. This can be used by
* connectors to determine offsets to start consuming data from. This is most commonly used during
* initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task.
+ * </p>
+ * <p>
+ * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by
+ * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct.
+ * </p>
*/
@InterfaceStability.Unstable
public interface OffsetStorageReader {
@@ -37,12 +42,12 @@ public interface OffsetStorageReader {
* @param partition object uniquely identifying the partition of data
* @return object uniquely identifying the offset in the partition of data
*/
- SchemaAndValue offset(SchemaAndValue partition);
+ <T> Map<String, Object> offset(Map<String, T> partition);
/**
* <p>
* Get a set of offsets for the specified partition identifiers. This may be more efficient
- * than calling {@link #offset(SchemaAndValue)} repeatedly.
+ * than calling {@link #offset(Map)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
@@ -56,5 +61,5 @@ public interface OffsetStorageReader {
* @param partitions set of identifiers for partitions of data
* @return a map of partition identifiers to decoded offsets
*/
- Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions);
+ <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
new file mode 100644
index 0000000..8d708f8
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link Converter} implementation that only supports serializing to strings. When converting Copycat data to bytes,
+ * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
+ * When converting from bytes to Copycat format, the converter will only ever return an optional string schema and
+ * a string or null.
+ *
+ * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
+ * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
+ * setting.
+ */
+public class StringConverter implements Converter {
+ private final StringSerializer serializer = new StringSerializer();
+ private final StringDeserializer deserializer = new StringDeserializer();
+
+ public StringConverter() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ Map<String, Object> serializerConfigs = new HashMap<>();
+ serializerConfigs.putAll(configs);
+ Map<String, Object> deserializerConfigs = new HashMap<>();
+ deserializerConfigs.putAll(configs);
+
+ Object encodingValue = configs.get("converter.encoding");
+ if (encodingValue != null) {
+ serializerConfigs.put("serializer.encoding", encodingValue);
+ deserializerConfigs.put("deserializer.encoding", encodingValue);
+ }
+
+ serializer.configure(serializerConfigs, isKey);
+ deserializer.configure(deserializerConfigs, isKey);
+ }
+
+ @Override
+ public byte[] fromCopycatData(String topic, Schema schema, Object value) {
+ try {
+ return serializer.serialize(topic, value == null ? null : value.toString());
+ } catch (SerializationException e) {
+ throw new DataException("Failed to serialize to a string: ", e);
+ }
+ }
+
+ @Override
+ public SchemaAndValue toCopycatData(String topic, byte[] value) {
+ try {
+ return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
+ } catch (SerializationException e) {
+ throw new DataException("Failed to deserialize string: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
new file mode 100644
index 0000000..3ea69c1
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+
+public class StringConverterTest {
+ private static final String TOPIC = "topic";
+ private static final String SAMPLE_STRING = "a string";
+
+ private StringConverter converter = new StringConverter();
+
+ @Test
+ public void testStringToBytes() throws UnsupportedEncodingException {
+ assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+ }
+
+ @Test
+ public void testNonStringToBytes() throws UnsupportedEncodingException {
+ assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
+ }
+
+ @Test
+ public void testNullToBytes() {
+ assertEquals(null, converter.fromCopycatData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null));
+ }
+
+ @Test
+ public void testToBytesIgnoresSchema() throws UnsupportedEncodingException {
+ assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, null, true));
+ }
+
+ @Test
+ public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException {
+ converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+ assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
+ }
+
+ @Test
+ public void testBytesToString() {
+ SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes());
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(SAMPLE_STRING, data.value());
+ }
+
+ @Test
+ public void testBytesNullToString() {
+ SchemaAndValue data = converter.toCopycatData(TOPIC, null);
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(null, data.value());
+ }
+
+ @Test
+ public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
+ converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
+ SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
+ assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
+ assertEquals(SAMPLE_STRING, data.value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
index a841386..91292e9 100644
--- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
@@ -18,7 +18,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
@@ -26,17 +25,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
/**
* FileStreamSourceTask reads from stdin or a file.
*/
public class FileStreamSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
- private static final Schema OFFSET_KEY_SCHEMA = Schema.STRING_SCHEMA;
- private static final Schema OFFSET_VALUE_SCHEMA = Schema.OPTIONAL_INT64_SCHEMA;
+ public static final String FILENAME_FIELD = "filename";
+ public static final String POSITION_FIELD = "position";
private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
private String filename;
@@ -66,14 +63,14 @@ public class FileStreamSourceTask extends SourceTask {
if (stream == null) {
try {
stream = new FileInputStream(filename);
- SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename));
- if (offsetWithSchema != null) {
- if (!offsetWithSchema.schema().equals(OFFSET_VALUE_SCHEMA))
- throw new CopycatException("Unexpected offset schema.");
- Long lastRecordedOffset = (Long) offsetWithSchema.value();
+ Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+ if (offset != null) {
+ Object lastRecordedOffset = offset.get(POSITION_FIELD);
+ if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long))
+ throw new CopycatException("Offset position is the incorrect type");
if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
- long skipLeft = lastRecordedOffset;
+ long skipLeft = (Long) lastRecordedOffset;
while (skipLeft > 0) {
try {
long skipped = stream.skip(skipLeft);
@@ -85,7 +82,7 @@ public class FileStreamSourceTask extends SourceTask {
}
log.debug("Skipped to offset {}", lastRecordedOffset);
}
- streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L;
+ streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
} else {
streamOffset = 0L;
}
@@ -130,7 +127,7 @@ public class FileStreamSourceTask extends SourceTask {
if (line != null) {
if (records == null)
records = new ArrayList<>();
- records.add(new SourceRecord(OFFSET_KEY_SCHEMA, filename, OFFSET_VALUE_SCHEMA, streamOffset, topic, VALUE_SCHEMA, line));
+ records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
}
new ArrayList<SourceRecord>();
} while (line != null);
@@ -193,4 +190,12 @@ public class FileStreamSourceTask extends SourceTask {
this.notify();
}
}
+
+ private Map<String, String> offsetKey(String filename) {
+ return Collections.singletonMap(FILENAME_FIELD, filename);
+ }
+
+ private Map<String, Long> offsetValue(Long pos) {
+ return Collections.singletonMap(POSITION_FIELD, pos);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
index ab89b6a..d2781c9 100644
--- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.copycat.file;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -31,7 +30,9 @@ import org.powermock.api.easymock.PowerMock;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@@ -89,7 +90,8 @@ public class FileStreamSourceTaskTest {
assertEquals(1, records.size());
assertEquals(TOPIC, records.get(0).topic());
assertEquals("partial line finished", records.get(0).value());
- assertEquals(22L, records.get(0).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset());
assertEquals(null, task.poll());
// Different line endings, and make sure the final \r doesn't result in a line until we can
@@ -99,20 +101,25 @@ public class FileStreamSourceTaskTest {
records = task.poll();
assertEquals(4, records.size());
assertEquals("line1", records.get(0).value());
- assertEquals(28L, records.get(0).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset());
assertEquals("line2", records.get(1).value());
- assertEquals(35L, records.get(1).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset());
assertEquals("line3", records.get(2).value());
- assertEquals(41L, records.get(2).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset());
assertEquals("line4", records.get(3).value());
- assertEquals(47L, records.get(3).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset());
os.write("subsequent text".getBytes());
os.flush();
records = task.poll();
assertEquals(1, records.size());
assertEquals("", records.get(0).value());
- assertEquals(48L, records.get(0).sourceOffset());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition());
+ assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset());
task.stop();
}
@@ -135,6 +142,6 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() {
- EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(SchemaAndValue.class))).andReturn(null);
+ EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 67df11d..1841640 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.copycat.data.*;
import org.apache.kafka.copycat.errors.DataException;
import org.apache.kafka.copycat.storage.Converter;
@@ -32,7 +33,9 @@ import java.util.*;
/**
* Implementation of Converter that uses JSON to store schemas and objects.
*/
-public class JsonConverter implements Converter<JsonNode> {
+public class JsonConverter implements Converter {
+ private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
+ private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>();
@@ -117,9 +120,7 @@ public class JsonConverter implements Converter<JsonNode> {
public Object convert(Schema schema, JsonNode value) {
if (value.isNull()) return checkOptionalAndDefault(schema);
- Schema elemSchema = schema.valueSchema();
- if (elemSchema == null)
- throw new DataException("Array schema did not specify the element type");
+ Schema elemSchema = schema == null ? null : schema.valueSchema();
ArrayList<Object> result = new ArrayList<>();
for (JsonNode elem : value) {
result.add(convertToCopycat(elemSchema, elem));
@@ -132,13 +133,14 @@ public class JsonConverter implements Converter<JsonNode> {
public Object convert(Schema schema, JsonNode value) {
if (value.isNull()) return checkOptionalAndDefault(schema);
- Schema keySchema = schema.keySchema();
- Schema valueSchema = schema.valueSchema();
+ Schema keySchema = schema == null ? null : schema.keySchema();
+ Schema valueSchema = schema == null ? null : schema.valueSchema();
// If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
- // primitive types or a complex type as a key, it will be encoded as a list of pairs
+ // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
+ // schema, we default to encoding in a Map.
Map<Object, Object> result = new HashMap<>();
- if (keySchema.type() == Schema.Type.STRING) {
+ if (schema == null || keySchema.type() == Schema.Type.STRING) {
if (!value.isObject())
throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType());
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
@@ -182,24 +184,73 @@ public class JsonConverter implements Converter<JsonNode> {
}
});
+
+ }
+
+ private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
+
+ private final JsonSerializer serializer = new JsonSerializer();
+ private final JsonDeserializer deserializer = new JsonDeserializer();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
+ if (enableConfigsVal != null)
+ enableSchemas = enableConfigsVal.toString().equals("true");
+
+ serializer.configure(configs, isKey);
+ deserializer.configure(configs, isKey);
}
@Override
- public JsonNode fromCopycatData(Schema schema, Object value) {
- return convertToJsonWithSchemaEnvelope(schema, value);
+ public byte[] fromCopycatData(String topic, Schema schema, Object value) {
+ JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
+ try {
+ return serializer.serialize(topic, jsonValue);
+ } catch (SerializationException e) {
+ throw new DataException("Converting Copycat data to byte[] failed due to serialization error: ", e);
+ }
}
@Override
- public SchemaAndValue toCopycatData(JsonNode value) {
- if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
- throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
+ public SchemaAndValue toCopycatData(String topic, byte[] value) {
+ JsonNode jsonValue;
+ try {
+ jsonValue = deserializer.deserialize(topic, value);
+ } catch (SerializationException e) {
+ throw new DataException("Converting byte[] to Copycat data failed due to serialization error: ", e);
+ }
- Schema schema = asCopycatSchema(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- return new SchemaAndValue(schema, convertToCopycat(schema, value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+ if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
+ throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields");
+
+ // The deserialized data should either be an envelope object containing the schema and the payload or the schema
+ // was stripped during serialization and we need to fill in an all-encompassing schema.
+ if (!enableSchemas) {
+ ObjectNode envelope = JsonNodeFactory.instance.objectNode();
+ envelope.set("schema", null);
+ envelope.set("payload", jsonValue);
+ jsonValue = envelope;
+ }
+
+ return jsonToCopycat(jsonValue);
}
+ private SchemaAndValue jsonToCopycat(JsonNode jsonValue) {
+ if (jsonValue == null)
+ return SchemaAndValue.NULL;
+
+ if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+ throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
+
+ Schema schema = asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+ return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
+ }
private static ObjectNode asJsonSchema(Schema schema) {
+ if (schema == null)
+ return null;
+
final ObjectNode jsonSchema;
switch (schema.type()) {
case BOOLEAN:
@@ -369,16 +420,22 @@ public class JsonConverter implements Converter<JsonNode> {
* @param value the value
* @return JsonNode-encoded version
*/
- private static JsonNode convertToJsonWithSchemaEnvelope(Schema schema, Object value) {
+ private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
}
+ private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
+ return convertToJson(schema, value);
+ }
+
/**
* Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
* and the converted object.
*/
private static JsonNode convertToJson(Schema schema, Object value) {
if (value == null) {
+ if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
+ return null;
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue());
if (schema.isOptional())
@@ -386,85 +443,141 @@ public class JsonConverter implements Converter<JsonNode> {
throw new DataException("Conversion error: null value for field that is required and has no default value");
}
- switch (schema.type()) {
- case INT8:
- return JsonNodeFactory.instance.numberNode((Byte) value);
- case INT16:
- return JsonNodeFactory.instance.numberNode((Short) value);
- case INT32:
- return JsonNodeFactory.instance.numberNode((Integer) value);
- case INT64:
- return JsonNodeFactory.instance.numberNode((Long) value);
- case FLOAT32:
- return JsonNodeFactory.instance.numberNode((Float) value);
- case FLOAT64:
- return JsonNodeFactory.instance.numberNode((Double) value);
- case BOOLEAN:
- return JsonNodeFactory.instance.booleanNode((Boolean) value);
- case STRING:
- CharSequence charSeq = (CharSequence) value;
- return JsonNodeFactory.instance.textNode(charSeq.toString());
- case BYTES:
- if (value instanceof byte[])
- return JsonNodeFactory.instance.binaryNode((byte[]) value);
- else if (value instanceof ByteBuffer)
- return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
- else
- throw new DataException("Invalid type for bytes type: " + value.getClass());
- case ARRAY: {
- if (!(value instanceof Collection))
- throw new DataException("Invalid type for array type: " + value.getClass());
- Collection collection = (Collection) value;
- ArrayNode list = JsonNodeFactory.instance.arrayNode();
- for (Object elem : collection) {
- JsonNode fieldValue = convertToJson(schema.valueSchema(), elem);
- list.add(fieldValue);
- }
- return list;
+ try {
+ final Schema.Type schemaType;
+ if (schema == null) {
+ schemaType = CopycatSchema.schemaType(value.getClass());
+ if (schemaType == null)
+ throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type.");
+ } else {
+ schemaType = schema.type();
}
- case MAP: {
- if (!(value instanceof Map))
- throw new DataException("Invalid type for array type: " + value.getClass());
- Map<?, ?> map = (Map<?, ?>) value;
- // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
- boolean objectMode = schema.keySchema().type() == Schema.Type.STRING;
- ObjectNode obj = null;
- ArrayNode list = null;
- if (objectMode)
- obj = JsonNodeFactory.instance.objectNode();
- else
- list = JsonNodeFactory.instance.arrayNode();
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- JsonNode mapKey = convertToJson(schema.keySchema(), entry.getKey());
- JsonNode mapValue = convertToJson(schema.valueSchema(), entry.getValue());
-
+ switch (schemaType) {
+ case INT8:
+ return JsonNodeFactory.instance.numberNode((Byte) value);
+ case INT16:
+ return JsonNodeFactory.instance.numberNode((Short) value);
+ case INT32:
+ return JsonNodeFactory.instance.numberNode((Integer) value);
+ case INT64:
+ return JsonNodeFactory.instance.numberNode((Long) value);
+ case FLOAT32:
+ return JsonNodeFactory.instance.numberNode((Float) value);
+ case FLOAT64:
+ return JsonNodeFactory.instance.numberNode((Double) value);
+ case BOOLEAN:
+ return JsonNodeFactory.instance.booleanNode((Boolean) value);
+ case STRING:
+ CharSequence charSeq = (CharSequence) value;
+ return JsonNodeFactory.instance.textNode(charSeq.toString());
+ case BYTES:
+ if (value instanceof byte[])
+ return JsonNodeFactory.instance.binaryNode((byte[]) value);
+ else if (value instanceof ByteBuffer)
+ return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
+ else
+ throw new DataException("Invalid type for bytes type: " + value.getClass());
+ case ARRAY: {
+ Collection collection = (Collection) value;
+ ArrayNode list = JsonNodeFactory.instance.arrayNode();
+ for (Object elem : collection) {
+ Schema valueSchema = schema == null ? null : schema.valueSchema();
+ JsonNode fieldValue = convertToJson(valueSchema, elem);
+ list.add(fieldValue);
+ }
+ return list;
+ }
+ case MAP: {
+ Map<?, ?> map = (Map<?, ?>) value;
+ // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
+ boolean objectMode;
+ if (schema == null) {
+ objectMode = true;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ if (!(entry.getKey() instanceof String)) {
+ objectMode = false;
+ break;
+ }
+ }
+ } else {
+ objectMode = schema.keySchema().type() == Schema.Type.STRING;
+ }
+ ObjectNode obj = null;
+ ArrayNode list = null;
if (objectMode)
- obj.set(mapKey.asText(), mapValue);
+ obj = JsonNodeFactory.instance.objectNode();
else
- list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+ list = JsonNodeFactory.instance.arrayNode();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Schema keySchema = schema == null ? null : schema.keySchema();
+ Schema valueSchema = schema == null ? null : schema.valueSchema();
+ JsonNode mapKey = convertToJson(keySchema, entry.getKey());
+ JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
+
+ if (objectMode)
+ obj.set(mapKey.asText(), mapValue);
+ else
+ list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
+ }
+ return objectMode ? obj : list;
}
- return objectMode ? obj : list;
- }
- case STRUCT: {
- if (!(value instanceof Struct))
- throw new DataException("Invalid type for struct type: " + value.getClass());
- Struct struct = (Struct) value;
- if (struct.schema() != schema)
- throw new DataException("Mismatching schema.");
- ObjectNode obj = JsonNodeFactory.instance.objectNode();
- for (Field field : schema.fields()) {
- obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+ case STRUCT: {
+ Struct struct = (Struct) value;
+ if (struct.schema() != schema)
+ throw new DataException("Mismatching schema.");
+ ObjectNode obj = JsonNodeFactory.instance.objectNode();
+ for (Field field : schema.fields()) {
+ obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
+ }
+ return obj;
}
- return obj;
}
- }
- throw new DataException("Couldn't convert " + value + " to JSON.");
+ throw new DataException("Couldn't convert " + value + " to JSON.");
+ } catch (ClassCastException e) {
+ throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
+ }
}
private static Object convertToCopycat(Schema schema, JsonNode jsonValue) {
- JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schema.type());
+ JsonToCopycatTypeConverter typeConverter;
+ final Schema.Type schemaType;
+ if (schema != null) {
+ schemaType = schema.type();
+ } else {
+ switch (jsonValue.getNodeType()) {
+ case NULL:
+ // Special case. With no schema
+ return null;
+ case BOOLEAN:
+ schemaType = Schema.Type.BOOLEAN;
+ break;
+ case NUMBER:
+ if (jsonValue.isIntegralNumber())
+ schemaType = Schema.Type.INT64;
+ else
+ schemaType = Schema.Type.FLOAT64;
+ break;
+ case ARRAY:
+ schemaType = Schema.Type.ARRAY;
+ break;
+ case OBJECT:
+ schemaType = Schema.Type.MAP;
+ break;
+ case STRING:
+ schemaType = Schema.Type.STRING;
+ break;
+
+ case BINARY:
+ case MISSING:
+ case POJO:
+ default:
+ schemaType = null;
+ break;
+ }
+ }
+ typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType);
if (typeConverter == null)
throw new DataException("Unknown schema type: " + schema.type());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
index 29c7bac..1661754 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
@@ -18,9 +18,6 @@ package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
@@ -31,22 +28,6 @@ import java.util.Map;
* structured data without having associated Java classes. This deserializer also supports Copycat schemas.
*/
public class JsonDeserializer implements Deserializer<JsonNode> {
- private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode();
- private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode();
- private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode();
- private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode();
- static {
- CATCH_ALL_OBJECT_SCHEMA.put("type", "object")
- .putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all"));
-
- CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all");
-
- ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string")
- .add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA);
-
- CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST);
- }
-
private ObjectMapper objectMapper = new ObjectMapper();
/**
@@ -61,6 +42,9 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
+ if (bytes == null)
+ return null;
+
JsonNode data;
try {
data = objectMapper.readTree(bytes);
@@ -68,15 +52,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
throw new SerializationException(e);
}
- // The deserialized data should either be an envelope object containing the schema and the payload or the schema
- // was stripped during serialization and we need to fill in an all-encompassing schema.
- if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) {
- ObjectNode envelope = JsonNodeFactory.instance.objectNode();
- envelope.set("schema", CATCH_ALL_SCHEMA);
- envelope.set("payload", data);
- data = envelope;
- }
-
return data;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
index 80df6be..129d14b 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
@@ -28,12 +28,7 @@ import java.util.Map;
* structured data without corresponding Java classes. This serializer also supports Copycat schemas.
*/
public class JsonSerializer implements Serializer<JsonNode> {
-
- private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
- private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
-
private final ObjectMapper objectMapper = new ObjectMapper();
- private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
/**
* Default constructor needed by Kafka
@@ -44,9 +39,6 @@ public class JsonSerializer implements Serializer<JsonNode> {
@Override
public void configure(Map<String, ?> config, boolean isKey) {
- Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG);
- if (enableConfigsVal != null)
- enableSchemas = enableConfigsVal.toString().equals("true");
}
@Override
@@ -54,14 +46,7 @@ public class JsonSerializer implements Serializer<JsonNode> {
if (data == null)
return null;
- // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
- // specific format: { "schema": {...}, "payload": ... }.
- if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))
- throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields");
-
try {
- if (!enableSchemas)
- data = data.get("payload");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
[2/3] kafka git commit: KAFKA-2475: Make Copycat only have a
Converter class instead of Serializer, Deserializer, and Converter.
Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index ab4a86e..214f9ce 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -19,19 +19,19 @@ package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.errors.DataException;
import org.junit.Test;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JsonConverterTest {
+ private static final String TOPIC = "topic";
ObjectMapper objectMapper = new ObjectMapper();
JsonConverter converter = new JsonConverter();
@@ -48,51 +49,51 @@ public class JsonConverterTest {
@Test
public void testCopycatSchemaMetadataTranslation() {
// this validates the non-type fields are translated and handled properly
- assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
- assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }")));
+ assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+ assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
- converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }")));
+ converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true),
- converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }")));
+ converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }".getBytes()));
}
// Schema types
@Test
public void booleanToCopycat() {
- assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
- assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
+ assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
+ assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes()));
}
@Test
public void byteToCopycat() {
- assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }")));
+ assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes()));
}
@Test
public void shortToCopycat() {
- assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }")));
+ assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes()));
}
@Test
public void intToCopycat() {
- assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }")));
+ assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes()));
}
@Test
public void longToCopycat() {
- assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }")));
- assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }")));
+ assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes()));
+ assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
}
@Test
public void floatToCopycat() {
- assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
+ assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
}
@Test
public void doubleToCopycat() {
- assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
+ assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes()));
}
@@ -100,69 +101,105 @@ public class JsonConverterTest {
public void bytesToCopycat() throws UnsupportedEncodingException {
ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
- SchemaAndValue schemaAndValue = converter.toCopycatData(parse(msg));
+ SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
assertEquals(reference, converted);
}
@Test
public void stringToCopycat() {
- assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
+ assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
}
@Test
public void arrayToCopycat() {
- JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }");
- assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(arrayJson));
+ byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes();
+ assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson));
}
@Test
public void mapToCopycatStringKeys() {
- JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }");
+ byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
Map<String, Integer> expected = new HashMap<>();
expected.put("key1", 12);
expected.put("key2", 15);
- assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+ assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
}
@Test
public void mapToCopycatNonStringKeys() {
- JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }");
+ byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
Map<Integer, Integer> expected = new HashMap<>();
expected.put(1, 12);
expected.put(2, 15);
- assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
+ assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
}
@Test
public void structToCopycat() {
- JsonNode structJson = parse("{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }");
+ byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes();
Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string");
- SchemaAndValue converted = converter.toCopycatData(structJson);
+ SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson);
assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
}
+ @Test(expected = DataException.class)
+ public void nullToCopycat() {
+ // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
+ assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, null));
+ }
+
+ @Test
+ public void nullSchemaPrimitiveToCopycat() {
+ SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());
+ assertEquals(SchemaAndValue.NULL, converted);
+
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes());
+ assertEquals(new SchemaAndValue(null, true), converted);
+
+ // Integers: Copycat has more data types, and JSON unfortunately mixes all number types. We try to preserve
+ // info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson
+ // determine if it's an integer or not
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes());
+ assertEquals(new SchemaAndValue(null, 12L), converted);
+
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes());
+ assertEquals(new SchemaAndValue(null, 12.24), converted);
+
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes());
+ assertEquals(new SchemaAndValue(null, "a string"), converted);
+
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes());
+ assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted);
+
+ converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes());
+ Map<String, Long> obj = new HashMap<>();
+ obj.put("field1", 1L);
+ obj.put("field2", 2L);
+ assertEquals(new SchemaAndValue(null, obj), converted);
+ }
+
// Schema metadata
@Test
public void testJsonSchemaMetadataTranslation() {
- JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
- converted = converter.fromCopycatData(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
+ converted = parse(converter.fromCopycatData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
- converted = converter.fromCopycatData(SchemaBuilder.bool().defaultValue(true).build(), true);
+ converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
- converted = converter.fromCopycatData(SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true);
+ converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -173,7 +210,7 @@ public class JsonConverterTest {
@Test
public void booleanToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
@@ -181,7 +218,7 @@ public class JsonConverterTest {
@Test
public void byteToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.INT8_SCHEMA, (byte) 12);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT8_SCHEMA, (byte) 12));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -189,7 +226,7 @@ public class JsonConverterTest {
@Test
public void shortToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.INT16_SCHEMA, (short) 12);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT16_SCHEMA, (short) 12));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -197,7 +234,7 @@ public class JsonConverterTest {
@Test
public void intToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.INT32_SCHEMA, 12);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT32_SCHEMA, 12));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
@@ -205,7 +242,7 @@ public class JsonConverterTest {
@Test
public void longToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.INT64_SCHEMA, 4398046511104L);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
@@ -213,7 +250,7 @@ public class JsonConverterTest {
@Test
public void floatToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.FLOAT32_SCHEMA, 12.34f);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
@@ -221,7 +258,7 @@ public class JsonConverterTest {
@Test
public void doubleToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.FLOAT64_SCHEMA, 12.34);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
@@ -229,7 +266,7 @@ public class JsonConverterTest {
@Test
public void bytesToJson() throws IOException {
- JsonNode converted = converter.fromCopycatData(Schema.BYTES_SCHEMA, "test-string".getBytes());
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes()));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(ByteBuffer.wrap("test-string".getBytes()),
@@ -238,7 +275,7 @@ public class JsonConverterTest {
@Test
public void stringToJson() {
- JsonNode converted = converter.fromCopycatData(Schema.STRING_SCHEMA, "test-string");
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, "test-string"));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
@@ -247,7 +284,7 @@ public class JsonConverterTest {
@Test
public void arrayToJson() {
Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
- JsonNode converted = converter.fromCopycatData(int32Array, Arrays.asList(1, 2, 3));
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, int32Array, Arrays.asList(1, 2, 3)));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -261,7 +298,7 @@ public class JsonConverterTest {
Map<String, Integer> input = new HashMap<>();
input.put("key1", 12);
input.put("key2", 15);
- JsonNode converted = converter.fromCopycatData(stringIntMap, input);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, stringIntMap, input));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -275,21 +312,28 @@ public class JsonConverterTest {
Map<Integer, Integer> input = new HashMap<>();
input.put(1, 12);
input.put(2, 15);
- JsonNode converted = converter.fromCopycatData(intIntMap, input);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, input));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- assertEquals(JsonNodeFactory.instance.arrayNode()
- .add(JsonNodeFactory.instance.arrayNode().add(1).add(12))
- .add(JsonNodeFactory.instance.arrayNode().add(2).add(15)),
- converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+
+ assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+ ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+ assertEquals(2, payload.size());
+ Set<JsonNode> payloadEntries = new HashSet<>();
+ for (JsonNode elem : payload)
+ payloadEntries.add(elem);
+ assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12),
+ JsonNodeFactory.instance.arrayNode().add(2).add(15))),
+ payloadEntries
+ );
}
@Test
public void structToJson() {
Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
Struct input = new Struct(schema).put("field1", true).put("field2", "string");
- JsonNode converted = converter.fromCopycatData(schema, input);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, input));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
@@ -299,6 +343,102 @@ public class JsonConverterTest {
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
+
+ @Test
+ public void nullSchemaAndPrimitiveToJson() {
+ // This still needs to do conversion of data, null schema means "anything goes"
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
+ validateEnvelopeNullSchema(converted);
+ assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+ assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
+ }
+
+ @Test
+ public void nullSchemaAndArrayToJson() {
+ // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+ // types to verify conversion still works.
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, Arrays.asList(1, "string", true)));
+ validateEnvelopeNullSchema(converted);
+ assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+ assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true),
+ converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ }
+
+ @Test
+ public void nullSchemaAndMapToJson() {
+ // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+ // types to verify conversion still works.
+ Map<String, Object> input = new HashMap<>();
+ input.put("key1", 12);
+ input.put("key2", "string");
+ input.put("key3", true);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
+ validateEnvelopeNullSchema(converted);
+ assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+ assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true),
+ converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ }
+
+ @Test
+ public void nullSchemaAndMapNonStringKeysToJson() {
+ // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
+ // types to verify conversion still works.
+ Map<Object, Object> input = new HashMap<>();
+ input.put("string", 12);
+ input.put(52, "string");
+ input.put(false, true);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
+ validateEnvelopeNullSchema(converted);
+ assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+ assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
+ ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+ assertEquals(3, payload.size());
+ Set<JsonNode> payloadEntries = new HashSet<>();
+ for (JsonNode elem : payload)
+ payloadEntries.add(elem);
+ assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12),
+ JsonNodeFactory.instance.arrayNode().add(52).add("string"),
+ JsonNodeFactory.instance.arrayNode().add(false).add(true))),
+ payloadEntries
+ );
+ }
+
+
+ @Test(expected = DataException.class)
+ public void mismatchSchemaJson() {
+ // If we have mismatching schema info, we should properly convert to a DataException
+ converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true);
+ }
+
+
+
+ @Test
+ public void noSchemaToCopycat() {
+ Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+ converter.configure(props, true);
+ assertEquals(new SchemaAndValue(null, true), converter.toCopycatData(TOPIC, "true".getBytes()));
+ }
+
+ @Test
+ public void noSchemaToJson() {
+ Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
+ converter.configure(props, true);
+ JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
+ assertTrue(converted.isBoolean());
+ assertEquals(true, converted.booleanValue());
+ }
+
+
+
+ private JsonNode parse(byte[] json) {
+ try {
+ return objectMapper.readTree(json);
+ } catch (IOException e) {
+ fail("IOException during JSON parse: " + e.getMessage());
+ throw new RuntimeException("failed");
+ }
+ }
+
private JsonNode parse(String json) {
try {
return objectMapper.readTree(json);
@@ -316,4 +456,13 @@ public class JsonConverterTest {
assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
+
+ private void validateEnvelopeNullSchema(JsonNode env) {
+ assertNotNull(env);
+ assertTrue(env.isObject());
+ assertEquals(2, env.size());
+ assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+ assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
+ assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
index 23cdf4d..a976d90 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -32,8 +32,7 @@ import java.util.Properties;
public class WorkerConfig extends AbstractConfig {
public static final String CLUSTER_CONFIG = "cluster";
- private static final String
- CLUSTER_CONFIG_DOC =
+ private static final String CLUSTER_CONFIG_DOC =
"ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
+ "or instances may co-exist while sharing a single Kafka cluster.";
public static final String CLUSTER_DEFAULT = "copycat";
@@ -58,21 +57,13 @@ public class WorkerConfig extends AbstractConfig {
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
- public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
- public static final String KEY_SERIALIZER_CLASS_DOC =
- "Serializer class for key that implements the <code>Serializer</code> interface.";
+ public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter";
+ public static final String OFFSET_KEY_CONVERTER_CLASS_DOC =
+ "Converter class for offset key Copycat data that implements the <code>Converter</code> interface.";
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
- public static final String VALUE_SERIALIZER_CLASS_DOC =
- "Serializer class for value that implements the <code>Serializer</code> interface.";
-
- public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
- public static final String KEY_DESERIALIZER_CLASS_DOC =
- "Serializer class for key that implements the <code>Deserializer</code> interface.";
-
- public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
- public static final String VALUE_DESERIALIZER_CLASS_DOC =
- "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+ public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter";
+ public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC =
+ "Converter class for offset value Copycat data that implements the <code>Converter</code> interface.";
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
@@ -104,14 +95,10 @@ public class WorkerConfig extends AbstractConfig {
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
- .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
- Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
+ .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC)
+ .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+ Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index 704470a..6cbce0b 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -17,8 +17,6 @@
package org.apache.kafka.copycat.runtime;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -47,34 +45,36 @@ import java.util.Properties;
* Since each task has a dedicated thread, this is mainly just a container for them.
* </p>
*/
-public class Worker<K, V> {
+public class Worker {
private static final Logger log = LoggerFactory.getLogger(Worker.class);
private Time time;
private WorkerConfig config;
- private Converter<K> keyConverter;
- private Converter<V> valueConverter;
+ private Converter keyConverter;
+ private Converter valueConverter;
+ private Converter offsetKeyConverter;
+ private Converter offsetValueConverter;
private OffsetBackingStore offsetBackingStore;
- private Serializer<K> offsetKeySerializer;
- private Serializer<V> offsetValueSerializer;
- private Deserializer<K> offsetKeyDeserializer;
- private Deserializer<V> offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
- private KafkaProducer<K, V> producer;
+ private KafkaProducer<byte[], byte[]> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(WorkerConfig config) {
- this(new SystemTime(), config, null, null, null, null, null);
+ this(new SystemTime(), config, null);
}
@SuppressWarnings("unchecked")
- public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
- Serializer offsetKeySerializer, Serializer offsetValueSerializer,
- Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
+ public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
this.time = time;
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
+ this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true);
+ this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
if (offsetBackingStore != null) {
this.offsetBackingStore = offsetBackingStore;
@@ -82,34 +82,6 @@ public class Worker<K, V> {
this.offsetBackingStore = new FileOffsetBackingStore();
this.offsetBackingStore.configure(config.originals());
}
-
- if (offsetKeySerializer != null) {
- this.offsetKeySerializer = offsetKeySerializer;
- } else {
- this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.offsetKeySerializer.configure(config.originals(), true);
- }
-
- if (offsetValueSerializer != null) {
- this.offsetValueSerializer = offsetValueSerializer;
- } else {
- this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.offsetValueSerializer.configure(config.originals(), false);
- }
-
- if (offsetKeyDeserializer != null) {
- this.offsetKeyDeserializer = offsetKeyDeserializer;
- } else {
- this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.offsetKeyDeserializer.configure(config.originals(), true);
- }
-
- if (offsetValueDeserializer != null) {
- this.offsetValueDeserializer = offsetValueDeserializer;
- } else {
- this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- this.offsetValueDeserializer.configure(config.originals(), false);
- }
}
public void start() {
@@ -119,8 +91,8 @@ public class Worker<K, V> {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
for (String propName : unusedConfigs.stringPropertyNames()) {
producerProps.put(propName, unusedConfigs.getProperty(propName));
}
@@ -188,14 +160,14 @@ public class Worker<K, V> {
final WorkerTask workerTask;
if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task;
- OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.connector(),
- keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
- OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.connector(),
- keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
- workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
+ OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+ offsetKeyConverter, offsetValueConverter);
+ OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
+ offsetKeyConverter, offsetValueConverter);
+ workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
- workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+ workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index dfb1f96..7e71fb8 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
/**
* WorkerTask that uses a SinkTask to export data from Kafka.
*/
-class WorkerSinkTask<K, V> implements WorkerTask {
+class WorkerSinkTask implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
private final ConnectorTaskId id;
private final SinkTask task;
private final WorkerConfig workerConfig;
private final Time time;
- private final Converter<K> keyConverter;
- private final Converter<V> valueConverter;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
private WorkerSinkTaskThread workThread;
- private KafkaConsumer<K, V> consumer;
+ private KafkaConsumer<byte[], byte[]> consumer;
private final SinkTaskContext context;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
- Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
+ Converter keyConverter, Converter valueConverter, Time time) {
this.id = id;
this.task = task;
this.workerConfig = workerConfig;
@@ -107,7 +107,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
public void poll(long timeoutMs) {
try {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
- ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
+ ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs);
} catch (ConsumerWakeupException we) {
@@ -154,7 +154,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
return workerConfig;
}
- private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
+ private KafkaConsumer<byte[], byte[]> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatException("Sink tasks require a list of topics.");
@@ -168,12 +168,10 @@ class WorkerSinkTask<K, V> implements WorkerTask {
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- KafkaConsumer<K, V> newConsumer;
+ KafkaConsumer<byte[], byte[]> newConsumer;
try {
newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) {
@@ -202,14 +200,14 @@ class WorkerSinkTask<K, V> implements WorkerTask {
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
}
- private void deliverMessages(ConsumerRecords<K, V> msgs) {
+ private void deliverMessages(ConsumerRecords<byte[], byte[]> msgs) {
// Finally, deliver this batch to the sink
if (msgs.count() > 0) {
List<SinkRecord> records = new ArrayList<>();
- for (ConsumerRecord<K, V> msg : msgs) {
+ for (ConsumerRecord<byte[], byte[]> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
- SchemaAndValue keyAndSchema = msg.key() != null ? keyConverter.toCopycatData(msg.key()) : SchemaAndValue.NULL;
- SchemaAndValue valueAndSchema = msg.value() != null ? valueConverter.toCopycatData(msg.value()) : SchemaAndValue.NULL;
+ SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
+ SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
records.add(
new SinkRecord(msg.topic(), msg.partition(),
keyAndSchema.schema(), keyAndSchema.value(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index 14b9c3a..ee0a532 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.copycat.cli.WorkerConfig;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -46,33 +45,31 @@ import java.util.concurrent.TimeoutException;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
-class WorkerSourceTask<K, V> implements WorkerTask {
+class WorkerSourceTask implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
private ConnectorTaskId id;
private SourceTask task;
- private final Converter<K> keyConverter;
- private final Converter<V> valueConverter;
- private KafkaProducer<K, V> producer;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ private KafkaProducer<byte[], byte[]> producer;
private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader;
- private OffsetStorageWriter<K, V> offsetWriter;
+ private OffsetStorageWriter offsetWriter;
private final WorkerConfig workerConfig;
private final Time time;
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
// there is no IdentityHashSet.
- private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
- outstandingMessages;
+ private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
// A second buffer is used while an offset flush is running
- private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
- outstandingMessagesBacklog;
+ private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
private boolean flushing;
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
- Converter<K> keyConverter, Converter<V> valueConverter,
- KafkaProducer<K, V> producer,
- OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
+ Converter keyConverter, Converter valueConverter,
+ KafkaProducer<byte[], byte[]> producer,
+ OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig, Time time) {
this.id = id;
this.task = task;
@@ -132,10 +129,9 @@ class WorkerSourceTask<K, V> implements WorkerTask {
*/
private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) {
- K key = (record.keySchema() != null) ? keyConverter.fromCopycatData(record.keySchema(), record.key()) : null;
- V value = (record.valueSchema() != null) ? valueConverter.fromCopycatData(record.valueSchema(), record.value()) : null;
- final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
- record.topic(), record.kafkaPartition(), key, value);
+ byte[] key = keyConverter.fromCopycatData(record.topic(), record.keySchema(), record.key());
+ byte[] value = valueConverter.fromCopycatData(record.topic(), record.valueSchema(), record.value());
+ final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
log.trace("Appending record with key {}, value {}", record.key(), record.value());
if (!flushing) {
outstandingMessages.put(producerRecord, producerRecord);
@@ -158,13 +154,12 @@ class WorkerSourceTask<K, V> implements WorkerTask {
}
});
// Offsets are converted & serialized in the OffsetWriter
- offsetWriter.offset(new SchemaAndValue(record.sourcePartitionSchema(), record.sourcePartition()),
- new SchemaAndValue(record.sourceOffsetSchema(), record.sourceOffset()));
+ offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
}
}
- private synchronized void recordSent(final ProducerRecord<K, V> record) {
- ProducerRecord<K, V> removed = outstandingMessages.remove(record);
+ private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
+ ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record);
@@ -276,7 +271,7 @@ class WorkerSourceTask<K, V> implements WorkerTask {
private void finishSuccessfulFlush() {
// If we were successful, we can just swap instead of replacing items back into the original map
- IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
+ IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages;
outstandingMessages = outstandingMessagesBacklog;
outstandingMessagesBacklog = temp;
flushing = false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index 237eda6..7521955 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -17,8 +17,6 @@
package org.apache.kafka.copycat.storage;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.slf4j.Logger;
@@ -35,39 +33,36 @@ import java.util.Map;
* directly, the interface is only separate from this implementation because it needs to be
* included in the public API package.
*/
-public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
+public class OffsetStorageReaderImpl implements OffsetStorageReader {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
private final OffsetBackingStore backingStore;
private final String namespace;
- private final Converter<K> keyConverter;
- private final Converter<V> valueConverter;
- private final Serializer<K> keySerializer;
- private final Deserializer<V> valueDeserializer;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
- Converter<K> keyConverter, Converter<V> valueConverter,
- Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
+ Converter keyConverter, Converter valueConverter) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
- this.keySerializer = keySerializer;
- this.valueDeserializer = valueDeserializer;
}
@Override
- public SchemaAndValue offset(SchemaAndValue partition) {
+ public <T> Map<String, Object> offset(Map<String, T> partition) {
return offsets(Arrays.asList(partition)).get(partition);
}
@Override
- public Map<SchemaAndValue, SchemaAndValue> offsets(Collection<SchemaAndValue> partitions) {
+ public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
// Serialize keys so backing store can work with them
- Map<ByteBuffer, SchemaAndValue> serializedToOriginal = new HashMap<>(partitions.size());
- for (SchemaAndValue key : partitions) {
+ Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());
+ for (Map<String, T> key : partitions) {
try {
- byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key.schema(), key.value()));
+ // Offsets are treated as schemaless, their format is only validated here (and the returned value below)
+ OffsetUtils.validateFormat(key);
+ byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, key);
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
@@ -87,7 +82,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
}
// Deserialize all the values and map back to the original keys
- Map<SchemaAndValue, SchemaAndValue> result = new HashMap<>(partitions.size());
+ Map<Map<String, T>, Map<String, Object>> result = new HashMap<>(partitions.size());
for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
try {
// Since null could be a valid key, explicitly check whether map contains the key
@@ -96,12 +91,12 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
+ "store may have returned invalid data", rawEntry.getKey());
continue;
}
- SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey());
- SchemaAndValue deserializedValue = valueConverter.toCopycatData(
- valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
- );
+ Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
+ SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
+ Object deserializedValue = deserializedSchemaAndValue.value();
+ OffsetUtils.validateFormat(deserializedValue);
- result.put(origKey, deserializedValue);
+ result.put(origKey, (Map<String, Object>) deserializedValue);
} catch (Throwable t) {
log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with"
+ " namespace {}. No value for this data will be returned, which may break the "
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
index 4fb75e7..be8c718 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -17,16 +17,13 @@
package org.apache.kafka.copycat.storage;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Future;
/**
@@ -64,32 +61,27 @@ import java.util.concurrent.Future;
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
* </p>
*/
-public class OffsetStorageWriter<K, V> {
+public class OffsetStorageWriter {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
private final OffsetBackingStore backingStore;
- private final Converter<K> keyConverter;
- private final Converter<V> valueConverter;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
private final String namespace;
// Offset data in Copycat format
- private Map<SchemaAndValue, SchemaAndValue> data = new HashMap<>();
+ private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread
- private Map<SchemaAndValue, SchemaAndValue> toFlush = null;
+ private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
// Unique ID for each flush request to handle callbacks after timeouts
private long currentFlushId = 0;
public OffsetStorageWriter(OffsetBackingStore backingStore,
- String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
- Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ String namespace, Converter keyConverter, Converter valueConverter) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
}
/**
@@ -97,8 +89,8 @@ public class OffsetStorageWriter<K, V> {
* @param partition the partition to store an offset for
* @param offset the offset
*/
- public synchronized void offset(SchemaAndValue partition, SchemaAndValue offset) {
- data.put(partition, offset);
+ public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) {
+ data.put((Map<String, Object>) partition, (Map<String, Object>) offset);
}
private boolean flushing() {
@@ -142,10 +134,14 @@ public class OffsetStorageWriter<K, V> {
Map<ByteBuffer, ByteBuffer> offsetsSerialized;
try {
offsetsSerialized = new HashMap<>();
- for (Map.Entry<SchemaAndValue, SchemaAndValue> entry : toFlush.entrySet()) {
- byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey().schema(), entry.getKey().value()));
+ for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
+ // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
+ // for that data. The only enforcement of the format is here.
+ OffsetUtils.validateFormat(entry.getKey());
+ OffsetUtils.validateFormat(entry.getValue());
+ byte[] key = keyConverter.fromCopycatData(namespace, null, entry.getKey());
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
- byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue().schema(), entry.getValue().value()));
+ byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue());
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer);
}
@@ -155,6 +151,7 @@ public class OffsetStorageWriter<K, V> {
log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+ "offsets under namespace {}. This likely won't recover unless the "
+ "unserializable partition or offset information is overwritten.", namespace);
+ log.error("Cause of serialization failure:", t);
callback.onCompletion(t, null);
return null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
new file mode 100644
index 0000000..bd3a87b
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.data.CopycatSchema;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.Map;
+
+public class OffsetUtils {
+ public static void validateFormat(Object offsetData) {
+ if (!(offsetData instanceof Map))
+ throw new DataException("Offsets must be specified as a Map");
+ validateFormat((Map<Object, Object>) offsetData);
+ }
+
+ public static <K, V> void validateFormat(Map<K, V> offsetData) {
+ for (Map.Entry<K, V> entry : offsetData.entrySet()) {
+ if (!(entry.getKey() instanceof String))
+ throw new DataException("Offsets may only use String keys");
+
+ Object value = entry.getValue();
+ if (value == null)
+ continue;
+ Schema.Type schemaType = CopycatSchema.schemaType(value.getClass());
+ if (!schemaType.isPrimitive())
+ throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 54e9bc6..542ed76 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -67,10 +67,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private Time time;
@Mock private SinkTask sinkTask;
private WorkerConfig workerConfig;
- @Mock private Converter<byte[]> keyConverter;
+ @Mock private Converter keyConverter;
@Mock
- private Converter<byte[]> valueConverter;
- private WorkerSinkTask<Integer, String> workerTask;
+ private Converter valueConverter;
+ private WorkerSinkTask workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskThread workerThread;
@@ -84,10 +84,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
- workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+ workerProps.setProperty("offset.value.converter.schemas.enable", "false");
workerConfig = new WorkerConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
@@ -138,12 +138,12 @@ public class WorkerSinkTaskTest extends ThreadedTest {
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
- new TopicPartition("topic", 0),
- Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
+ new TopicPartition(TOPIC, 0),
+ Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE))));
// Exact data doesn't matter, but should be passed directly to sink task
- EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
- EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
+ EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record);
+ EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record);
Capture<Collection<SinkRecord>> capturedRecords
= EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));
@@ -320,8 +320,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
return records;
}
});
- EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
- EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
+ EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
+ EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));
EasyMock.expectLastCall().anyTimes();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 018aa94..3ff3a62 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.source.SourceRecord;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.source.SourceTaskContext;
@@ -45,11 +44,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -59,37 +54,36 @@ import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
- private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA;
- private static final byte[] PARTITION_BYTES = "partition".getBytes();
- private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA;
- private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
+ private static final String TOPIC = "topic";
+ private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
+ private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
// Copycat-format data
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
private static final Integer KEY = -1;
private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
private static final Long RECORD = 12L;
- // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
+ // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place.
- private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
- private static final String CONVERTED_RECORD = "converted-record";
+ private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+ private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config;
@Mock private SourceTask sourceTask;
- @Mock private Converter<ByteBuffer> keyConverter;
- @Mock private Converter<String> valueConverter;
- @Mock private KafkaProducer<ByteBuffer, String> producer;
+ @Mock private Converter keyConverter;
+ @Mock private Converter valueConverter;
+ @Mock private KafkaProducer<byte[], byte[]> producer;
@Mock private OffsetStorageReader offsetReader;
- @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
- private WorkerSourceTask<ByteBuffer, String> workerTask;
+ @Mock private OffsetStorageWriter offsetWriter;
+ private WorkerSourceTask workerTask;
@Mock private Future<RecordMetadata> sendFuture;
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
private static final Properties EMPTY_TASK_PROPS = new Properties();
private static final List<SourceRecord> RECORDS = Arrays.asList(
- new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+ new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
);
@Override
@@ -98,16 +92,16 @@ public class WorkerSourceTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
- workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+ workerProps.setProperty("offset.value.converter.schemas.enable", "false");
config = new WorkerConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
private void createWorkerTask() {
- workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
+ workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime());
}
@@ -201,15 +195,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
- records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+ records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
- Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
+ Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
PowerMock.replayAll();
Whitebox.invokeMethod(workerTask, "sendRecords", records);
- assertEquals(CONVERTED_KEY, sent.getValue().key());
- assertEquals(CONVERTED_RECORD, sent.getValue().value());
+ assertEquals(SERIALIZED_KEY, sent.getValue().key());
+ assertEquals(SERIALIZED_RECORD, sent.getValue().value());
PowerMock.verifyAll();
}
@@ -233,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return latch;
}
- private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
- EasyMock.expect(keyConverter.fromCopycatData(KEY_SCHEMA, KEY)).andStubReturn(CONVERTED_KEY);
- EasyMock.expect(valueConverter.fromCopycatData(RECORD_SCHEMA, RECORD)).andStubReturn(CONVERTED_RECORD);
+ private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException {
+ EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY);
+ EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD);
- Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
+ Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
EasyMock.expect(
producer.send(EasyMock.capture(sent),
@@ -255,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
});
// 2. Offset data is passed to the offset storage.
- offsetWriter.offset(new SchemaAndValue(PARTITION_SCHEMA, PARTITION_BYTES), new SchemaAndValue(OFFSET_SCHEMA, OFFSET_BYTES));
+ offsetWriter.offset(PARTITION, OFFSET);
PowerMock.expectLastCall().anyTimes();
return sent;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index 32e7ff9..701e230 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -17,8 +17,6 @@
package org.apache.kafka.copycat.runtime;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.copycat.cli.WorkerConfig;
@@ -49,10 +47,6 @@ public class WorkerTest extends ThreadedTest {
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Worker worker;
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
- private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
- private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
- private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
- private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
@Before
public void setup() {
@@ -61,14 +55,12 @@ public class WorkerTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
- workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
- workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("offset.key.converter.schemas.enable", "false");
+ workerProps.setProperty("offset.value.converter.schemas.enable", "false");
WorkerConfig config = new WorkerConfig(workerProps);
- worker = new Worker(new MockTime(), config, offsetBackingStore,
- offsetKeySerializer, offsetValueSerializer,
- offsetKeyDeserializer, offsetValueDeserializer);
+ worker = new Worker(new MockTime(), config, offsetBackingStore);
worker.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3803e5cb/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index 9c0c52d..956d064 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -17,10 +17,6 @@
package org.apache.kafka.copycat.storage;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.easymock.Capture;
@@ -35,9 +31,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@@ -48,13 +42,9 @@ import static org.junit.Assert.assertTrue;
public class OffsetStorageWriterTest {
private static final String NAMESPACE = "namespace";
// Copycat format - any types should be accepted here
- private static final Schema OFFSET_KEY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
- private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
- private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA;
- private static final String OFFSET_VALUE = "value";
- // Native objects - must match serializer types
- private static final int OFFSET_KEY_CONVERTED = 12;
- private static final String OFFSET_VALUE_CONVERTED = "value-converted";
+ private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key");
+ private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12);
+
// Serialized
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
@@ -63,11 +53,9 @@ public class OffsetStorageWriterTest {
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
@Mock private OffsetBackingStore store;
- @Mock private Converter<Integer> keyConverter;
- @Mock private Converter<String> valueConverter;
- @Mock private Serializer<Integer> keySerializer;
- @Mock private Serializer<String> valueSerializer;
- private OffsetStorageWriter<Integer, String> writer;
+ @Mock private Converter keyConverter;
+ @Mock private Converter valueConverter;
+ private OffsetStorageWriter writer;
private static Exception exception = new RuntimeException("error");
@@ -75,7 +63,7 @@ public class OffsetStorageWriterTest {
@Before
public void setup() {
- writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
+ writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter);
service = Executors.newFixedThreadPool(1);
}
@@ -92,7 +80,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
- writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
@@ -128,7 +116,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
- writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
assertTrue(writer.beginFlush());
@@ -148,7 +136,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
- writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.doFlush(callback);
assertTrue(writer.beginFlush()); // should throw
@@ -160,7 +148,7 @@ public class OffsetStorageWriterTest {
public void testCancelBeforeAwaitFlush() {
PowerMock.replayAll();
- writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
writer.cancelFlush();
@@ -178,7 +166,7 @@ public class OffsetStorageWriterTest {
PowerMock.replayAll();
- writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE));
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
assertTrue(writer.beginFlush());
// Start the flush, then immediately cancel before allowing the mocked store request to finish
Future<Void> flushFuture = writer.doFlush(callback);
@@ -207,10 +195,8 @@ public class OffsetStorageWriterTest {
private void expectStore(final Callback<Void> callback,
final boolean fail,
final CountDownLatch waitForCompletion) {
- EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
- EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
- EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
- EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
+ EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
+ EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),