You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/26 19:49:00 UTC

[jira] [Commented] (KAFKA-4953) Global Store: cast exception when initialising with in-memory logged state store

    [ https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341504#comment-16341504 ] 

ASF GitHub Bot commented on KAFKA-4953:
---------------------------------------

guozhangwang closed pull request #2741: KAFKA-4953: Global Store: cast exception when initialising with in-memory logged state store
URL: https://github.com/apache/kafka/pull/2741
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 65b3e2f67ae..7f5d42da3c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -40,6 +40,7 @@
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -221,6 +222,25 @@ public void shouldDriveGlobalStore() throws Exception {
         assertEquals("value2", globalStore.get("key2"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    @Ignore
+    public void shouldDriveInMemoryLoggedGlobalStore() throws Exception {
+        final StateStoreSupplier storeSupplier = Stores.create("my-store")
+                .withStringKeys().withStringValues().inMemory().build();
+        final String global = "global";
+        final String topic = "topic";
+        final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) storeSupplier.get();
+        final TopologyBuilder topologyBuilder = this.builder
+                .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store")));
+
+        driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
+        driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertEquals("value1", globalStore.get("key1"));
+        assertEquals("value2", globalStore.get("key2"));
+    }
+
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         int partition = 10;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Global Store: cast exception when initialising with in-memory logged state store
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-4953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4953
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Yennick Trevels
>            Priority: Major
>              Labels: user-experience
>
> Currently it is not possible to initialise a global store with an in-memory *logged* store via the TopologyBuilder. This results in the following exception:
> {code}
> java.lang.ClassCastException: org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl cannot be cast to org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
> 	at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:52)
> 	at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:44)
> 	at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
> 	at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
> 	at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
> 	at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:215)
> 	at org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
> 	...
> {code}
> I've created a PR which includes a unit this to verify this behavior.
> If the below PR gets merge, the fixing PR should leverage the provided test {{ProcessorTopologyTest#shouldDriveInMemoryLoggedGlobalStore}} by removing the {{@ignore}} annotation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)