You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/15 18:18:15 UTC

kafka git commit: KAFKA-5206: Use default aggSerde if no user-overridden is provided in RocksDBSessionStore

Repository: kafka
Updated Branches:
  refs/heads/trunk 82e84fabf -> e40e27b4e


KAFKA-5206: Use default aggSerde if no user-overridden is provided in RocksDBSessionStore

RocksDBSessionStore wasn't properly using the default aggSerde if no Serde was supplied.

Author: Kyle Winkelman <ky...@optum.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2971 from KyleWinkelman/RocksDBSessionStore-fix-aggSerde-use


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e40e27b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e40e27b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e40e27b4

Branch: refs/heads/trunk
Commit: e40e27b4eb99e8931488e86289453c5f14d5e886
Parents: 82e84fa
Author: Kyle Winkelman <ky...@optum.com>
Authored: Mon May 15 11:18:12 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon May 15 11:18:12 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/state/internals/RocksDBSessionStore.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e40e27b4/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 5027781..103bb55 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -106,6 +106,6 @@ class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
 
     @Override
     public void put(final Windowed<K> sessionKey, final AGG aggregate) {
-        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), aggSerde.serializer().serialize(topic, aggregate));
+        bytesStore.put(SessionKeySerde.toBinary(sessionKey, serdes.keySerializer(), topic), serdes.rawValue(aggregate));
     }
 }