You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/05/10 22:52:16 UTC

Learnt lessons to share

Hi Colleagues,I have been having issues running my Beam / Kafka app for more than a week.I kept getting intermittent class not found exceptions left and right.Till I accidentally changed the name of one of the inner static classes.When I rebuilt, repackaged & redeployed to run, I noticed that Flink complains about that class name BUT with its old name:Caused by: java.lang.ClassNotFoundException: bench.flinkspark.flink.ReadFromKafka2$CompLRRecLengthFn
CompLRRecLengthFn was the old name which I had changed to CompLRRecFn (no Length).
So, I went back, restarted Flink JobManager & my cluster TaskManagers.That "class not found" issue went away. Therefore, it seems like we must recycle the Flink servers , but I am not sure how often!!!Any suggestions pls?
I am still getting an exception regarding that inner static class, but at least its being referred to by its true name "CompLRRecFn".
 java.lang.IllegalArgumentException: unable to deserialize bench.flinkspark.flink.ReadFromKafka2$CompLRRecFn@62dae245        at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)        at org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)

I commented out the invocation to that inner static class.I get p.run() getting to a RUNNING state, waiting for incoming data for a second, and quickly change state to FINISHED.

05/10/2016 15:44:52     Job execution switched to status RUNNING.05/10/2016 15:44:52     Source: Custom File source -> Flat Map -> Timestamps/Watermarks -> Flat Map -> Flat Map -> Flat Map -> Flat Map(1/1) switched to SCHEDULED05/10/2016 15:44:52     Source: Custom File source -> Flat Map -> Timestamps/Watermarks -> Flat Map -> Flat Map -> Flat Map -> Flat Map(1/1) switched to DEPLOYING05/10/2016 15:44:52     Source: Custom File source -> Flat Map -> Timestamps/Watermarks -> Flat Map -> Flat Map -> Flat Map -> Flat Map(1/1) switched to RUNNING05/10/2016 15:44:52     Source: Custom File source -> Flat Map -> Timestamps/Watermarks -> Flat Map -> Flat Map -> Flat Map -> Flat Map(1/1) switched to FINISHED05/10/2016 15:44:52     Job execution switched to status FINISHED.



isn't p.run() thread supposed to stay on for receiving data as a consumer of kafka?
Any idea why it switched to FINISHED immediately pls?
Cheers+ thanks again everyone.