You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dumitru-Nicolae Marasoui <ni...@ovoenergy.com> on 2020/04/20 19:19:28 UTC

Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Hello kafka community,
I am getting the stack trace below in an attempt at an integration test for
a new kafka-streams ETL between two topics (where the source topic is a new
one).
The way the local framework is organized, initially a local confluent stack
is started such as the schema registry server and a broker, and a script
takes a project file and generates topics and injects some messages in them.
After that there must be a step that I am missing (because these
integration tests can run without the broker or registry server running at
all)
Thing is that when I run the integration test I created I get the following
Exception:
Thank you for help,
Nicu

Unknown topic: identity_users_v1
java.lang.IllegalArgumentException: Unknown topic: identity_users_v1
at
org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
at
org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
at
org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at
org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
at
com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
at
com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
at
org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at
org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
at
com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
at
org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
at
org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
at org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
at org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
at
org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
at org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
at org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
at org.scalatest.Suite.run(Suite.scala:1124)
at org.scalatest.Suite.run$(Suite.scala:1106)
at org.scalatest.fixture.FlatSpec.org
$scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
at
org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
at
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
at
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
at
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
at
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)


-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by "Matthias J. Sax" <mj...@apache.org>.
Can you inspect the output of `Topology#describe()#toString()`?

It might also be a bug. What version are you using? Can you reproduce
the issue reliably (if yes, can you share you code so we can have a more
detailed look)?


-Matthias

On 4/21/20 7:02 AM, Murilo Tavares wrote:
> If topic is "users", do you have any idea where the name on the exception
> comes from? The exception says "Unknown topic: identity_users_v1".
> 
> 
> On Tue, 21 Apr 2020 at 06:08, Dumitru-Nicolae Marasoui <
> nicolae.marasoiu@ovoenergy.com> wrote:
> 
>> Hi Liam,
>> It is "users".
>> Thanks
>>
>> On Tue, 21 Apr 2020 at 10:23, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> Hi Nicu,
>>>
>>> I'd need to see more context to help - for example, what is the value of
>>> `topicName`? I've just finished writing Streams tests using the test
>>> driver, so can hopefully help with more code :)
>>>
>>> Cheers,
>>>
>>> Liam Clarke-Hutchinson
>>>
>>> On Tue, Apr 21, 2020 at 8:40 PM Dumitru-Nicolae Marasoui <
>>> nicolae.marasoiu@ovoenergy.com> wrote:
>>>
>>>> Hi Murilo & community,
>>>> Thanks for your answer,
>>>> I see that in the code this is done just before:
>>>>
>>>> val topicInput = inner.createInputTopic(topicName,
>> serdeKey.serializer(),
>>>> serdeValue.serializer())
>>>> val input = records.map { case (k, v) => new TestRecord(k, v) }
>>>> topicInput.pipeRecordList(input.asJava)
>>>>
>>>> What could be the explanation?
>>>> Thank y0u,
>>>> Nicu
>>>>
>>>> On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <mu...@gmail.com>
>>> wrote:
>>>>
>>>>> Hi Dumitru
>>>>> The TopologyTestDriver you are using was designed to unit test your
>>>>> topology, and will not work with the stack you run locally.
>>>>> That said, if you want to test your topology, you first need to
>> create
>>>> the
>>>>> fake input topic by calling “topologyDriver.createInputTopic()”
>>> (assuming
>>>>> you are using v2.4+) for every input topic you are using.
>>>>> Since you use “pipeRecordList”, make sure your records are all to the
>>>> same
>>>>> topic.
>>>>> Murilo
>>>>>
>>>>>
>>>>> On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
>>>>> nicolae.marasoiu@ovoenergy.com> wrote:
>>>>>
>>>>>> Hello kafka community,
>>>>>> I am getting the stack trace below in an attempt at an integration
>>> test
>>>>> for
>>>>>> a new kafka-streams ETL between two topics (where the source topic
>>> is a
>>>>> new
>>>>>> one).
>>>>>> The way the local framework is organized, initially a local
>> confluent
>>>>> stack
>>>>>> is started such as the schema registry server and a broker, and a
>>>> script
>>>>>> takes a project file and generates topics and injects some messages
>>> in
>>>>>> them.
>>>>>> After that there must be a step that I am missing (because these
>>>>>> integration tests can run without the broker or registry server
>>> running
>>>>> at
>>>>>> all)
>>>>>> Thing is that when I run the integration test I created I get the
>>>>> following
>>>>>> Exception:
>>>>>> Thank you for help,
>>>>>> Nicu
>>>>>>
>>>>>> Unknown topic: identity_users_v1
>>>>>> java.lang.IllegalArgumentException: Unknown topic:
>> identity_users_v1
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
>>>>>> at
>>>>>>
>>>>>
>>>>
>>>
>> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
>>>>>> at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>>>>>> at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>>>>>> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
>>>>>> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
>>>>>> at
>>> org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
>>>>>> at
>>> org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
>>>>>> at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
>>>>>> at
>>>>>
>> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>>>>> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
>>>>>> at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
>>>>>> at
>>>>>
>> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>>>>> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
>>>>>> at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
>>>>>> at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
>>>>>> at
>>> org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
>>>>>> at
>>>> org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
>>>>>> at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
>>>>>> at org.scalatest.Suite.run(Suite.scala:1124)
>>>>>> at org.scalatest.Suite.run$(Suite.scala:1106)
>>>>>> at org.scalatest.fixture.FlatSpec.org
>>>>>> $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
>>>>>> at
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
>>>>>> at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
>>>>>> at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
>>>>>> at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
>>>>>> at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
>>>>>> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
>>>>>> at scala.collection.immutable.List.foreach(List.scala:392)
>>>>>> at
>>> org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
>>>>>> at org.scalatest.tools.Runner$.run(Runner.scala:850)
>>>>>> at org.scalatest.tools.Runner.run(Runner.scala)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
>>>>>> at
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thank you,
>>>>>> Nicolae Marasoiu
>>>>>> Scala Engineer
>>>>>> Orion, OVO Group
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thank you,
>>>> Nicolae Marasoiu
>>>> Scala Engineer
>>>> Orion, OVO Group
>>>>
>>>
>>
>>
>> --
>> Thank you,
>> Nicolae Marasoiu
>> Scala Engineer
>> Orion, OVO Group
>>
> 


Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by Murilo Tavares <mu...@gmail.com>.
If topic is "users", do you have any idea where the name on the exception
comes from? The exception says "Unknown topic: identity_users_v1".


On Tue, 21 Apr 2020 at 06:08, Dumitru-Nicolae Marasoui <
nicolae.marasoiu@ovoenergy.com> wrote:

> Hi Liam,
> It is "users".
> Thanks
>
> On Tue, 21 Apr 2020 at 10:23, Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hi Nicu,
> >
> > I'd need to see more context to help - for example, what is the value of
> > `topicName`? I've just finished writing Streams tests using the test
> > driver, so can hopefully help with more code :)
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Tue, Apr 21, 2020 at 8:40 PM Dumitru-Nicolae Marasoui <
> > nicolae.marasoiu@ovoenergy.com> wrote:
> >
> > > Hi Murilo & community,
> > > Thanks for your answer,
> > > I see that in the code this is done just before:
> > >
> > > val topicInput = inner.createInputTopic(topicName,
> serdeKey.serializer(),
> > > serdeValue.serializer())
> > > val input = records.map { case (k, v) => new TestRecord(k, v) }
> > > topicInput.pipeRecordList(input.asJava)
> > >
> > > What could be the explanation?
> > > Thank y0u,
> > > Nicu
> > >
> > > On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <mu...@gmail.com>
> > wrote:
> > >
> > > > Hi Dumitru
> > > > The TopologyTestDriver you are using was designed to unit test your
> > > > topology, and will not work with the stack you run locally.
> > > > That said, if you want to test your topology, you first need to
> create
> > > the
> > > > fake input topic by calling “topologyDriver.createInputTopic()”
> > (assuming
> > > > you are using v2.4+) for every input topic you are using.
> > > > Since you use “pipeRecordList”, make sure your records are all to the
> > > same
> > > > topic.
> > > > Murilo
> > > >
> > > >
> > > > On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
> > > > nicolae.marasoiu@ovoenergy.com> wrote:
> > > >
> > > > > Hello kafka community,
> > > > > I am getting the stack trace below in an attempt at an integration
> > test
> > > > for
> > > > > a new kafka-streams ETL between two topics (where the source topic
> > is a
> > > > new
> > > > > one).
> > > > > The way the local framework is organized, initially a local
> confluent
> > > > stack
> > > > > is started such as the schema registry server and a broker, and a
> > > script
> > > > > takes a project file and generates topics and injects some messages
> > in
> > > > > them.
> > > > > After that there must be a step that I am missing (because these
> > > > > integration tests can run without the broker or registry server
> > running
> > > > at
> > > > > all)
> > > > > Thing is that when I run the integration test I created I get the
> > > > following
> > > > > Exception:
> > > > > Thank you for help,
> > > > > Nicu
> > > > >
> > > > > Unknown topic: identity_users_v1
> > > > > java.lang.IllegalArgumentException: Unknown topic:
> identity_users_v1
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
> > > > > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> > > > > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> > > > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
> > > > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
> > > > > at
> > org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
> > > > > at
> > org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
> > > > > at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
> > > > > at
> > > >
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
> > > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
> > > > > at
> > > >
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
> > > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
> > > > > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
> > > > > at
> > org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
> > > > > at
> > > org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
> > > > > at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
> > > > > at org.scalatest.Suite.run(Suite.scala:1124)
> > > > > at org.scalatest.Suite.run$(Suite.scala:1106)
> > > > > at org.scalatest.fixture.FlatSpec.org
> > > > > $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
> > > > > at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
> > > > > at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
> > > > > at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
> > > > > at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
> > > > > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
> > > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > > at
> > org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
> > > > > at org.scalatest.tools.Runner$.run(Runner.scala:850)
> > > > > at org.scalatest.tools.Runner.run(Runner.scala)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
> > > > >
> > > > >
> > > > > --
> > > > > Thank you,
> > > > > Nicolae Marasoiu
> > > > > Scala Engineer
> > > > > Orion, OVO Group
> > > > >
> > > >
> > >
> > >
> > > --
> > > Thank you,
> > > Nicolae Marasoiu
> > > Scala Engineer
> > > Orion, OVO Group
> > >
> >
>
>
> --
> Thank you,
> Nicolae Marasoiu
> Scala Engineer
> Orion, OVO Group
>

Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by Dumitru-Nicolae Marasoui <ni...@ovoenergy.com>.
Hi Liam,
It is "users".
Thanks

On Tue, 21 Apr 2020 at 10:23, Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hi Nicu,
>
> I'd need to see more context to help - for example, what is the value of
> `topicName`? I've just finished writing Streams tests using the test
> driver, so can hopefully help with more code :)
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Tue, Apr 21, 2020 at 8:40 PM Dumitru-Nicolae Marasoui <
> nicolae.marasoiu@ovoenergy.com> wrote:
>
> > Hi Murilo & community,
> > Thanks for your answer,
> > I see that in the code this is done just before:
> >
> > val topicInput = inner.createInputTopic(topicName, serdeKey.serializer(),
> > serdeValue.serializer())
> > val input = records.map { case (k, v) => new TestRecord(k, v) }
> > topicInput.pipeRecordList(input.asJava)
> >
> > What could be the explanation?
> > Thank y0u,
> > Nicu
> >
> > On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <mu...@gmail.com>
> wrote:
> >
> > > Hi Dumitru
> > > The TopologyTestDriver you are using was designed to unit test your
> > > topology, and will not work with the stack you run locally.
> > > That said, if you want to test your topology, you first need to create
> > the
> > > fake input topic by calling “topologyDriver.createInputTopic()”
> (assuming
> > > you are using v2.4+) for every input topic you are using.
> > > Since you use “pipeRecordList”, make sure your records are all to the
> > same
> > > topic.
> > > Murilo
> > >
> > >
> > > On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
> > > nicolae.marasoiu@ovoenergy.com> wrote:
> > >
> > > > Hello kafka community,
> > > > I am getting the stack trace below in an attempt at an integration
> test
> > > for
> > > > a new kafka-streams ETL between two topics (where the source topic
> is a
> > > new
> > > > one).
> > > > The way the local framework is organized, initially a local confluent
> > > stack
> > > > is started such as the schema registry server and a broker, and a
> > script
> > > > takes a project file and generates topics and injects some messages
> in
> > > > them.
> > > > After that there must be a step that I am missing (because these
> > > > integration tests can run without the broker or registry server
> running
> > > at
> > > > all)
> > > > Thing is that when I run the integration test I created I get the
> > > following
> > > > Exception:
> > > > Thank you for help,
> > > > Nicu
> > > >
> > > > Unknown topic: identity_users_v1
> > > > java.lang.IllegalArgumentException: Unknown topic: identity_users_v1
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
> > > > at
> > > >
> > > >
> > >
> >
> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
> > > > at
> > > >
> > > >
> > >
> >
> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
> > > > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> > > > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> > > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
> > > > at
> > > >
> > > >
> > >
> >
> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
> > > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
> > > > at
> org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
> > > > at
> org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
> > > > at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
> > > > at
> > > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
> > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
> > > > at
> > > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
> > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
> > > > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
> > > > at
> org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
> > > > at
> > org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
> > > > at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
> > > > at org.scalatest.Suite.run(Suite.scala:1124)
> > > > at org.scalatest.Suite.run$(Suite.scala:1106)
> > > > at org.scalatest.fixture.FlatSpec.org
> > > > $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
> > > > at
> > > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
> > > > at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
> > > > at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
> > > > at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
> > > > at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
> > > > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
> > > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > > at
> org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
> > > > at
> > > >
> > > >
> > >
> >
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
> > > > at org.scalatest.tools.Runner$.run(Runner.scala:850)
> > > > at org.scalatest.tools.Runner.run(Runner.scala)
> > > > at
> > > >
> > > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
> > > > at
> > > >
> > > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
> > > >
> > > >
> > > > --
> > > > Thank you,
> > > > Nicolae Marasoiu
> > > > Scala Engineer
> > > > Orion, OVO Group
> > > >
> > >
> >
> >
> > --
> > Thank you,
> > Nicolae Marasoiu
> > Scala Engineer
> > Orion, OVO Group
> >
>


-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Nicu,

I'd need to see more context to help - for example, what is the value of
`topicName`? I've just finished writing Streams tests using the test
driver, so can hopefully help with more code :)

Cheers,

Liam Clarke-Hutchinson

On Tue, Apr 21, 2020 at 8:40 PM Dumitru-Nicolae Marasoui <
nicolae.marasoiu@ovoenergy.com> wrote:

> Hi Murilo & community,
> Thanks for your answer,
> I see that in the code this is done just before:
>
> val topicInput = inner.createInputTopic(topicName, serdeKey.serializer(),
> serdeValue.serializer())
> val input = records.map { case (k, v) => new TestRecord(k, v) }
> topicInput.pipeRecordList(input.asJava)
>
> What could be the explanation?
> Thank y0u,
> Nicu
>
> On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <mu...@gmail.com> wrote:
>
> > Hi Dumitru
> > The TopologyTestDriver you are using was designed to unit test your
> > topology, and will not work with the stack you run locally.
> > That said, if you want to test your topology, you first need to create
> the
> > fake input topic by calling “topologyDriver.createInputTopic()” (assuming
> > you are using v2.4+) for every input topic you are using.
> > Since you use “pipeRecordList”, make sure your records are all to the
> same
> > topic.
> > Murilo
> >
> >
> > On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
> > nicolae.marasoiu@ovoenergy.com> wrote:
> >
> > > Hello kafka community,
> > > I am getting the stack trace below in an attempt at an integration test
> > for
> > > a new kafka-streams ETL between two topics (where the source topic is a
> > new
> > > one).
> > > The way the local framework is organized, initially a local confluent
> > stack
> > > is started such as the schema registry server and a broker, and a
> script
> > > takes a project file and generates topics and injects some messages in
> > > them.
> > > After that there must be a step that I am missing (because these
> > > integration tests can run without the broker or registry server running
> > at
> > > all)
> > > Thing is that when I run the integration test I created I get the
> > following
> > > Exception:
> > > Thank you for help,
> > > Nicu
> > >
> > > Unknown topic: identity_users_v1
> > > java.lang.IllegalArgumentException: Unknown topic: identity_users_v1
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
> > > at
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
> > > at
> > >
> > >
> >
> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
> > > at
> > >
> > >
> >
> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
> > > at
> > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
> > > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> > > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> > > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> > > at
> > >
> > >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
> > > at
> > >
> > >
> >
> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
> > > at
> > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
> > > at
> > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
> > > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
> > > at org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
> > > at org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
> > > at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
> > > at
> > >
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
> > > at
> > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
> > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
> > > at
> > org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
> > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
> > > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
> > > at org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
> > > at
> org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
> > > at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
> > > at org.scalatest.Suite.run(Suite.scala:1124)
> > > at org.scalatest.Suite.run$(Suite.scala:1106)
> > > at org.scalatest.fixture.FlatSpec.org
> > > $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
> > > at
> > >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
> > > at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
> > > at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
> > > at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
> > > at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
> > > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
> > > at scala.collection.immutable.List.foreach(List.scala:392)
> > > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
> > > at
> > >
> > >
> >
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
> > > at org.scalatest.tools.Runner$.run(Runner.scala:850)
> > > at org.scalatest.tools.Runner.run(Runner.scala)
> > > at
> > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
> > > at
> > >
> > >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
> > >
> > >
> > > --
> > > Thank you,
> > > Nicolae Marasoiu
> > > Scala Engineer
> > > Orion, OVO Group
> > >
> >
>
>
> --
> Thank you,
> Nicolae Marasoiu
> Scala Engineer
> Orion, OVO Group
>

Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by Dumitru-Nicolae Marasoui <ni...@ovoenergy.com>.
Hi Murilo & community,
Thanks for your answer,
I see that in the code this is done just before:

val topicInput = inner.createInputTopic(topicName, serdeKey.serializer(),
serdeValue.serializer())
val input = records.map { case (k, v) => new TestRecord(k, v) }
topicInput.pipeRecordList(input.asJava)

What could be the explanation?
Thank y0u,
Nicu

On Tue, 21 Apr 2020 at 04:04, Murilo Tavares <mu...@gmail.com> wrote:

> Hi Dumitru
> The TopologyTestDriver you are using was designed to unit test your
> topology, and will not work with the stack you run locally.
> That said, if you want to test your topology, you first need to create the
> fake input topic by calling “topologyDriver.createInputTopic()” (assuming
> you are using v2.4+) for every input topic you are using.
> Since you use “pipeRecordList”, make sure your records are all to the same
> topic.
> Murilo
>
>
> On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
> nicolae.marasoiu@ovoenergy.com> wrote:
>
> > Hello kafka community,
> > I am getting the stack trace below in an attempt at an integration test
> for
> > a new kafka-streams ETL between two topics (where the source topic is a
> new
> > one).
> > The way the local framework is organized, initially a local confluent
> stack
> > is started such as the schema registry server and a broker, and a script
> > takes a project file and generates topics and injects some messages in
> > them.
> > After that there must be a step that I am missing (because these
> > integration tests can run without the broker or registry server running
> at
> > all)
> > Thing is that when I run the integration test I created I get the
> following
> > Exception:
> > Thank you for help,
> > Nicu
> >
> > Unknown topic: identity_users_v1
> > java.lang.IllegalArgumentException: Unknown topic: identity_users_v1
> > at
> >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
> > at
> >
> >
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
> > at
> >
> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
> > at
> >
> >
> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
> > at
> >
> >
> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
> > at
> >
> >
> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
> > at
> >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
> > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> > at
> >
> >
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
> > at
> >
> >
> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
> > at
> >
> >
> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
> > at
> >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
> > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
> > at org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
> > at org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
> > at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
> > at
> >
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
> > at
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
> > at scala.collection.immutable.List.foreach(List.scala:392)
> > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
> > at
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
> > at scala.collection.immutable.List.foreach(List.scala:392)
> > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
> > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
> > at org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
> > at org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
> > at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
> > at org.scalatest.Suite.run(Suite.scala:1124)
> > at org.scalatest.Suite.run$(Suite.scala:1106)
> > at org.scalatest.fixture.FlatSpec.org
> > $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
> > at
> >
> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
> > at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
> > at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
> > at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
> > at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
> > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> > at
> >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
> > at
> >
> >
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
> > at scala.collection.immutable.List.foreach(List.scala:392)
> > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
> > at
> >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
> > at
> >
> >
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
> > at
> >
> >
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
> > at
> >
> >
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
> > at org.scalatest.tools.Runner$.run(Runner.scala:850)
> > at org.scalatest.tools.Runner.run(Runner.scala)
> > at
> >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
> > at
> >
> >
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
> >
> >
> > --
> > Thank you,
> > Nicolae Marasoiu
> > Scala Engineer
> > Orion, OVO Group
> >
>


-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Re: Unknown topic at org.apache.kafka.streams.TopologyTestDriver.pipeRecord

Posted by Murilo Tavares <mu...@gmail.com>.
Hi Dumitru
The TopologyTestDriver you are using was designed to unit test your
topology, and will not work with the stack you run locally.
That said, if you want to test your topology, you first need to create the
fake input topic by calling “topologyDriver.createInputTopic()” (assuming
you are using v2.4+) for every input topic you are using.
Since you use “pipeRecordList”, make sure your records are all to the same
topic.
Murilo


On Mon, Apr 20, 2020 at 3:27 PM Dumitru-Nicolae Marasoui <
nicolae.marasoiu@ovoenergy.com> wrote:

> Hello kafka community,
> I am getting the stack trace below in an attempt at an integration test for
> a new kafka-streams ETL between two topics (where the source topic is a new
> one).
> The way the local framework is organized, initially a local confluent stack
> is started such as the schema registry server and a broker, and a script
> takes a project file and generates topics and injects some messages in
> them.
> After that there must be a step that I am missing (because these
> integration tests can run without the broker or registry server running at
> all)
> Thing is that when I run the integration test I created I get the following
> Exception:
> Thank you for help,
> Nicu
>
> Unknown topic: identity_users_v1
> java.lang.IllegalArgumentException: Unknown topic: identity_users_v1
> at
>
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:488)
> at
>
> org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:742)
> at
> org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
> at
>
> org.apache.kafka.streams.TestInputTopic.pipeRecordList(TestInputTopic.java:188)
> at
>
> com.ovoenergy.tests.testtools.TopologyTest$TopologyTestDriverOps.pushRecordsTo(TopologyTests.scala:88)
> at
>
> com.ovoenergy.globaltopics.pipelines.orion.UserEventV1PipelineTest.$anonfun$new$1(UserEventV1PipelineTest.scala:31)
> at
>
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.$anonfun$apply$1(TestSuite.scala:132)
> at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at
>
> org.scalatest.fixture.TestSuite$TestFunAndConfigMap.apply(TestSuite.scala:132)
> at
>
> com.ovoenergy.tests.testtools.FlatSpecTopologyTest.withFixture(TopologyTests.scala:28)
> at
>
> org.scalatest.fixture.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:2127)
> at
>
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:2138)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
> at org.scalatest.fixture.FlatSpecLike.runTest(FlatSpecLike.scala:2138)
> at org.scalatest.fixture.FlatSpecLike.runTest$(FlatSpecLike.scala:2119)
> at org.scalatest.fixture.FlatSpec.runTest(FlatSpec.scala:226)
> at
>
> org.scalatest.fixture.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:2181)
> at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:370)
> at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:407)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
> at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
> at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
> at org.scalatest.fixture.FlatSpecLike.runTests(FlatSpecLike.scala:2181)
> at org.scalatest.fixture.FlatSpecLike.runTests$(FlatSpecLike.scala:2180)
> at org.scalatest.fixture.FlatSpec.runTests(FlatSpec.scala:226)
> at org.scalatest.Suite.run(Suite.scala:1124)
> at org.scalatest.Suite.run$(Suite.scala:1106)
> at org.scalatest.fixture.FlatSpec.org
> $scalatest$fixture$FlatSpecLike$$super$run(FlatSpec.scala:226)
> at
> org.scalatest.fixture.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:2202)
> at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
> at org.scalatest.fixture.FlatSpecLike.run(FlatSpecLike.scala:2202)
> at org.scalatest.fixture.FlatSpecLike.run$(FlatSpecLike.scala:2201)
> at org.scalatest.fixture.FlatSpec.run(FlatSpec.scala:226)
> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> at
>
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
> at
>
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
> at
>
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
> at
>
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
> at
>
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
> at
>
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
> at org.scalatest.tools.Runner$.run(Runner.scala:850)
> at org.scalatest.tools.Runner.run(Runner.scala)
> at
>
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
> at
>
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
>
>
> --
> Thank you,
> Nicolae Marasoiu
> Scala Engineer
> Orion, OVO Group
>