You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Michelle Francois <mi...@gmail.com> on 2019/12/29 23:05:56 UTC

StateStore extends UnicastRemoteObject

Hello,
I want to have two way communication in Apache Kafka and since the Apache
Kafka Topology permits no cyclic topology I was suggested by my supervisor
to use State Stores as remote objects.

I created custom State Stores as described here:
https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores

(I did not use interactive queries I just created some custom states stores
for my Kafka Processors)

If I declare the state store as:

* public class MyCustomStore<K,V> implements StateStore,
MyWriteableCustomStore<String, String>, Serializable*
and I pass my Custom State Stores to some remote method of another remote
object and since it implements Serializable it passes by value and I want
it to be passed by reference.

I think my Custom State Store in order to be passed by reference should now
extend  UnicastRemoteObject like that:

*public class MyCustomStore<K,V> extends UnicastRemoteObject implements
StateStore, MyWriteableCustomStore<String, String>*
*...*
*public MyCustomStore(...) throws RemoteException{*
*...*
*}*
but now I get this Exception at initialization

ERROR stream-thread [site-client-StreamThread-1] Encountered the following
error during processing:
(org.apache.kafka.streams.processor.internals.StreamThread:744)
java.lang.NullPointerException at
org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226)
                                             at
org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225)
                                             at
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
                                             at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314)
                                             at
org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824)
                                             at
org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767)
                                             at
org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736)


I don't list my code of State Stores since it is consisted of hundreds of
lines.If I have to I will enlist.

My version of Apache Kafka Streams in the cluster I use is :
kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder)

Whenever an instance of my Custom State Store is created  I use try- catch
clause since the constructor throws RemoteException

Thanks in advance

Re: StateStore extends UnicastRemoteObject

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Michelle,

Are you sure you do not pass a null instead of your custom store to
your topology by mistake?
How does the implementation of the `build()` method of your
`MyCustomStoreBuilder` look like?

Best,
Bruno

On Mon, Dec 30, 2019 at 12:06 AM Michelle Francois <mi...@gmail.com> wrote:
>
> Hello,
> I want to have two way communication in Apache Kafka and since the Apache
> Kafka Topology permits no cyclic topology I was suggested by my supervisor
> to use State Stores as remote objects.
>
> I created custom State Stores as described here:
> https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
>
> (I did not use interactive queries I just created some custom states stores
> for my Kafka Processors)
>
> If I declare the state store as:
>
> * public class MyCustomStore<K,V> implements StateStore,
> MyWriteableCustomStore<String, String>, Serializable*
> and I pass my Custom State Stores to some remote method of another remote
> object and since it implements Serializable it passes by value and I want
> it to be passed by reference.
>
> I think my Custom State Store in order to be passed by reference should now
> extend  UnicastRemoteObject like that:
>
> *public class MyCustomStore<K,V> extends UnicastRemoteObject implements
> StateStore, MyWriteableCustomStore<String, String>*
> *...*
> *public MyCustomStore(...) throws RemoteException{*
> *...*
> *}*
> but now I get this Exception at initialization
>
> ERROR stream-thread [site-client-StreamThread-1] Encountered the following
> error during processing:
> (org.apache.kafka.streams.processor.internals.StreamThread:744)
> java.lang.NullPointerException at
> org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226)
>                                              at
> org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225)
>                                              at
> org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
>                                              at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314)
>                                              at
> org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824)
>                                              at
> org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767)
>                                              at
> org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736)
>
>
> I don't list my code of State Stores since it is consisted of hundreds of
> lines.If I have to I will enlist.
>
> My version of Apache Kafka Streams in the cluster I use is :
> kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder)
>
> Whenever an instance of my Custom State Store is created  I use try- catch
> clause since the constructor throws RemoteException
>
> Thanks in advance