You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/29 18:31:05 UTC
kafka git commit: MINOR: remove the init method from Serdes
Repository: kafka
Updated Branches:
refs/heads/trunk a58b459bd -> 598851f19
MINOR: remove the init method from Serdes
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #834 from ymatsuda/remove_init_from_Serdes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/598851f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/598851f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/598851f1
Branch: refs/heads/trunk
Commit: 598851f19cbf444c1f2530ec27543dd606b6fbc9
Parents: a58b459
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Jan 29 09:31:02 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jan 29 09:31:02 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/state/Serdes.java | 33 +++++++-------------
.../state/internals/MeteredKeyValueStore.java | 1 -
.../streams/state/internals/RocksDBStore.java | 2 --
3 files changed, 11 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
index 4e1b05a..e1e78af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
public final class Serdes<K, V> {
@@ -63,8 +62,7 @@ public final class Serdes<K, V> {
private Deserializer<V> valueDeserializer;
/**
- * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the
- * corresponding {@link ProcessorContext}'s serializer or deserializer, which
+ * Create a context for serialization using the specified serializers and deserializers which
* <em>must</em> match the key and value types used as parameters for this object.
*
* @param topic the name of the topic
@@ -78,31 +76,22 @@ public final class Serdes<K, V> {
Serializer<K> keySerializer, Deserializer<K> keyDeserializer,
Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
this.topic = topic;
+
+ if (keySerializer == null)
+ throw new NullPointerException();
+ if (keyDeserializer == null)
+ throw new NullPointerException();
+ if (valueSerializer == null)
+ throw new NullPointerException();
+ if (valueDeserializer == null)
+ throw new NullPointerException();
+
this.keySerializer = keySerializer;
this.keyDeserializer = keyDeserializer;
this.valueSerializer = valueSerializer;
this.valueDeserializer = valueDeserializer;
}
- /**
- * Create a context for serialization using the {@link ProcessorContext}'s serializers and deserializers, which
- * <em>must</em> match the key and value types used as parameters for this object.
- *
- * @param topic the name of the topic
- */
- @SuppressWarnings("unchecked")
- public Serdes(String topic) {
- this(topic, null, null, null, null);
- }
-
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- keySerializer = keySerializer != null ? keySerializer : (Serializer<K>) context.keySerializer();
- keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer<K>) context.keyDeserializer();
- valueSerializer = valueSerializer != null ? valueSerializer : (Serializer<V>) context.valueSerializer();
- valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer<V>) context.valueDeserializer();
- }
-
public Deserializer<K> keyDeserializer() {
return keyDeserializer;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index d5fe44a..6dee4c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -88,7 +88,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
- serialization.init(context);
this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
// register and possibly restore the state from the logs
http://git-wip-us.apache.org/repos/asf/kafka/blob/598851f1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index dea7e0b..b324ff1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -89,8 +89,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
public void init(ProcessorContext context) {
- serdes.init(context);
-
this.context = context;
this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);