You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/29 18:31:05 UTC

kafka git commit: MINOR: remove the init method from Serdes

Repository: kafka
Updated Branches:
  refs/heads/trunk a58b459bd -> 598851f19


MINOR: remove the init method from Serdes

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #834 from ymatsuda/remove_init_from_Serdes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/598851f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/598851f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/598851f1

Branch: refs/heads/trunk
Commit: 598851f19cbf444c1f2530ec27543dd606b6fbc9
Parents: a58b459
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Jan 29 09:31:02 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jan 29 09:31:02 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/state/Serdes.java  | 33 +++++++-------------
 .../state/internals/MeteredKeyValueStore.java   |  1 -
 .../streams/state/internals/RocksDBStore.java   |  2 --
 3 files changed, 11 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 4e1b05a..e1e78af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
 
 public final class Serdes<K, V> {
 
@@ -63,8 +62,7 @@ public final class Serdes<K, V> {
     private Deserializer<V> valueDeserializer;
 
     /**
-     * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
-     * corresponding {@link ProcessorContext}'s serializer or deserializer, which
+     * Create a context for serialization using the specified serializers and deserializers which
      * <em>must</em> match the key and value types used as parameters for this object.
      *
      * @param topic the name of the topic
@@ -78,31 +76,22 @@ public final class Serdes<K, V> {
             Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
             Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
         this.topic = topic;
+
+        if (keySerializer == null)
+            throw new NullPointerException();
+        if (keyDeserializer == null)
+            throw new NullPointerException();
+        if (valueSerializer == null)
+            throw new NullPointerException();
+        if (valueDeserializer == null)
+            throw new NullPointerException();
+
         this.keySerializer = keySerializer;
         this.keyDeserializer = keyDeserializer;
         this.valueSerializer = valueSerializer;
         this.valueDeserializer = valueDeserializer;
     }
 
-    /**
-     * Create a context for serialization using the {@link ProcessorContext}'s serializers and deserializers, which
-     * <em>must</em> match the key and value types used as parameters for this object.
-     *
-     * @param topic the name of the topic
-     */
-    @SuppressWarnings("unchecked")
-    public Serdes(String topic) {
-        this(topic, null, null, null, null);
-    }
-
-    @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context) {
-        keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
-        keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
-        valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
-        valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
-    }
-
     public Deserializer<K> keyDeserializer() {
         return keyDeserializer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index d5fe44a..6dee4c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -88,7 +88,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
         this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
 
-        serialization.init(context);
         this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
 
         // register and possibly restore the state from the logs

http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index dea7e0b..b324ff1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -89,8 +89,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     public void init(ProcessorContext context) {
-        serdes.init(context);
-
         this.context = context;
         this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);