You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/15 18:57:39 UTC

kafka git commit: KAFKA-5205: Use default values of keySerde if it is not specified by users in CachingSessionStore [Forced Update!]

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 9668b6bf6 -> 5719e8c9c (forced update)


KAFKA-5205: Use default values of keySerde if it is not specified by users in CachingSessionStore

CachingSessionStore wasn't properly using the default keySerde if no Serde was supplied. I saw the below error in the logs for one of my test cases.

Author: Kyle Winkelman <ky...@optum.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2963 from KyleWinkelman/CachingSessionStore-fix-keySerde-use

(cherry picked from commit 475cc2544e18b6b321e716691648024cdbbafb16)
Signed-off-by: Guozhang Wang <wa...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5719e8c9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5719e8c9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5719e8c9

Branch: refs/heads/0.10.2
Commit: 5719e8c9c07d7580a8806fddbb7135bd56be9e0c
Parents: c15b93f
Author: Kyle Winkelman <ky...@optum.com>
Authored: Mon May 15 11:51:10 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 15 11:56:40 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/internals/CachingSessionStore.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5719e8c9/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index ed64246..80160b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -60,7 +60,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime) {
         validateStoreOpen();
-        final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(topic, key));
+        final Bytes binarySessionId = Bytes.wrap(serdes.rawKey(key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName,
                                                                                   keySchema.lowerRange(binarySessionId,
                                                                                                        earliestSessionEndTime).get(),
@@ -81,7 +81,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
 
     public void put(final Windowed<K> key, AGG value) {
         validateStoreOpen();
-        final Bytes binaryKey = SessionKeySerde.toBinary(key, keySerde.serializer(), topic);
+        final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
                                                       key.window().end(), context.partition(), context.topic());
         cache.put(cacheName, binaryKey.get(), entry);
@@ -127,7 +127,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         final RecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
-            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
+            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), serdes.keyDeserializer(), topic);
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
                 final AGG oldValue = fetchPrevious(binaryKey);