You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/26 21:14:12 UTC
kafka git commit: KAFKA-3723: Cannot change size of schema cache for
JSON converter
Repository: kafka
Updated Branches:
refs/heads/trunk d5366471d -> 3cf2de069
KAFKA-3723: Cannot change size of schema cache for JSON converter
Author: Christian Posta <ch...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1401 from christian-posta/ceposta-connect-class-cast-error
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cf2de06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cf2de06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cf2de06
Branch: refs/heads/trunk
Commit: 3cf2de0694cf0e276d25d8c7048a9928b41969a3
Parents: d536647
Author: Christian Posta <ch...@gmail.com>
Authored: Thu May 26 14:13:54 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 26 14:13:54 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/connect/json/JsonConverter.java | 2 +-
.../kafka/connect/json/JsonConverterTest.java | 17 +++++++++++++++++
.../src/test/resources/connect-test.properties | 2 ++
.../runtime/distributed/DistributedHerderTest.java | 2 --
4 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index d9a6859..59e653e 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -304,7 +304,7 @@ public class JsonConverter implements Converter {
Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG);
if (cacheSizeVal != null)
- cacheSize = (int) cacheSizeVal;
+ cacheSize = Integer.parseInt((String) cacheSizeVal);
fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index c923285..0e7c153 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -36,10 +37,13 @@ import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
+import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Calendar;
@@ -607,6 +611,19 @@ public class JsonConverterTest {
assertEquals(2, cache.size());
}
+ @Test
+ public void testJsonSchemaCacheSizeFromConfigFile() throws URISyntaxException, IOException {
+ URL url = getClass().getResource("/connect-test.properties");
+ File propFile = new File(url.toURI());
+ String workerPropsFile = propFile.getAbsolutePath();
+ Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+
+ JsonConverter rc = new JsonConverter();
+ rc.configure(workerProps, false);
+
+ }
+
private JsonNode parse(byte[] json) {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/resources/connect-test.properties
----------------------------------------------------------------------
diff --git a/connect/json/src/test/resources/connect-test.properties b/connect/json/src/test/resources/connect-test.properties
new file mode 100644
index 0000000..5b7e788
--- /dev/null
+++ b/connect/json/src/test/resources/connect-test.properties
@@ -0,0 +1,2 @@
+schemas.cache.size=1
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 81e6be8..747db1a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -81,9 +81,7 @@ public class DistributedHerderTest {
HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
- HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
}
private static final String MEMBER_URL = "memberUrl";