You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/05/26 21:14:12 UTC

kafka git commit: KAFKA-3723: Cannot change size of schema cache for JSON converter

Repository: kafka
Updated Branches:
  refs/heads/trunk d5366471d -> 3cf2de069


KAFKA-3723: Cannot change size of schema cache for JSON converter

Author: Christian Posta <ch...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1401 from christian-posta/ceposta-connect-class-cast-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3cf2de06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3cf2de06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3cf2de06

Branch: refs/heads/trunk
Commit: 3cf2de0694cf0e276d25d8c7048a9928b41969a3
Parents: d536647
Author: Christian Posta <ch...@gmail.com>
Authored: Thu May 26 14:13:54 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 26 14:13:54 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/connect/json/JsonConverter.java   |  2 +-
 .../kafka/connect/json/JsonConverterTest.java      | 17 +++++++++++++++++
 .../src/test/resources/connect-test.properties     |  2 ++
 .../runtime/distributed/DistributedHerderTest.java |  2 --
 4 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index d9a6859..59e653e 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -304,7 +304,7 @@ public class JsonConverter implements Converter {
 
         Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG);
         if (cacheSizeVal != null)
-            cacheSize = (int) cacheSizeVal;
+            cacheSize = Integer.parseInt((String) cacheSizeVal);
         fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
         toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index c923285..0e7c153 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.data.Decimal;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -36,10 +37,13 @@ import org.junit.Before;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -607,6 +611,19 @@ public class JsonConverterTest {
         assertEquals(2, cache.size());
     }
 
+    @Test
+    public void testJsonSchemaCacheSizeFromConfigFile() throws URISyntaxException, IOException {
+        URL url = getClass().getResource("/connect-test.properties");
+        File propFile = new File(url.toURI());
+        String workerPropsFile = propFile.getAbsolutePath();
+        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+
+        JsonConverter rc = new JsonConverter();
+        rc.configure(workerProps, false);
+
+    }
+
 
     private JsonNode parse(byte[] json) {
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/json/src/test/resources/connect-test.properties
----------------------------------------------------------------------
diff --git a/connect/json/src/test/resources/connect-test.properties b/connect/json/src/test/resources/connect-test.properties
new file mode 100644
index 0000000..5b7e788
--- /dev/null
+++ b/connect/json/src/test/resources/connect-test.properties
@@ -0,0 +1,2 @@
+schemas.cache.size=1
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/3cf2de06/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 81e6be8..747db1a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -81,9 +81,7 @@ public class DistributedHerderTest {
         HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
         HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
-        HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
     }
     private static final String MEMBER_URL = "memberUrl";