You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/06/27 02:47:00 UTC
[jira] [Assigned] (KAFKA-8630) Unit testing a streams processor
with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler reassigned KAFKA-8630:
-----------------------------------
Assignee: John Roesler
> Unit testing a streams processor with a WindowStore throws a ClassCastException
> -------------------------------------------------------------------------------
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Affects Versions: 2.3.0
> Reporter: Justin Fetherolf
> Assignee: John Roesler
> Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> I was attempting to write a unit test for a class implementing the {{Processor}} interface that contained a {{WindowStore}}, but running the test fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor<String, String> {
> private ProcessorContext context;
> private WindowStore<String, String> windowStore;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore<String, String>) context.getStateStore("my-win-store");
> }
> @Override
> public void process(String key, String value) {
> }
> @Override
> public void close() {
> }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
> InMemWindowProcessor processor = null;
> MockProcessorContext context = null;
> @Before
> public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore<String, String> store =
> Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
> "my-win-store",
> Duration.ofMinutes(10),
> Duration.ofSeconds(10),
> false
> ),
> Serdes.String(),
> Serdes.String()
> )
> .withLoggingDisabled()
> .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
> }
> @Test
> public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
> }
> }
> {code}
>
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"{noformat}
> And finally the stack trace:
> {noformat}
> -------------------------------------------------------
> T E S T S
> -------------------------------------------------------
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR!
> java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
> at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> Results :
> Tests in error:
> testThings(com.cantgetthistowork.InMemWindowProcessorTest): org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)