You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/04 14:29:49 UTC

java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

Hi,

I am developing my flink application.
For start I have built a class that reads events from Kafka and outputs
them datastream.print()

The job runs every time.
But starting with the 2nd time I see this in the standalone session log:

2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
               [] - Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
        at
org.apache.kafka.common.network.Selector.close(Selector.java:806)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.common.network.Selector.close(Selector.java:365)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
        at java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[?:?]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
        ... 6 more
2022-02-04 15:16:30,802 INFO
 org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
closed.

Am I doing something wrong?

This is basically the gist of the code:

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
        .setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
        .setBounded(OffsetsInitializer.latest())
        .build();

//withIdleness.duration()
//env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),
"Kafka Source");
DataStream<String> ds = env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

ds.print();

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

Posted by HG <ha...@gmail.com>.
Super
Thanks

Op ma 7 feb. 2022 om 13:04 schreef Chesnay Schepler <ch...@apache.org>:

> I think you can safely ignore this warning. It shouldn't cause any harm,
> but I will file a ticket nonetheless.
>
> On 07/02/2022 12:52, HG wrote:
>
> I have nothing like that in the config (flink-conf.yaml).
>
> Just downloaded the software and did bin/start-cluster.sh
>
> Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ch...@apache.org>:
>
>> I meant in the Flink config of the cluster you are submitting the jobs to.
>> Specifically whether classloader.check-leaked-classloader was set to
>> false.
>>
>> On 07/02/2022 10:28, HG wrote:
>>
>> Hi,
>>
>> Well I have set :
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setMaxParallelism(5);
>> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.enableCheckpointing(500);
>>
>> On the other hand .setBounded(OffsetsInitializer.latest())
>>
>> Perhaps that is a bit of a conflict?
>> The job should be unbounded anyway.
>> When I cancel it  (the unbounded) via the GUI and start it again I do not see the same issue.
>>
>> So perhaps not very important.
>> Regards Hans
>>
>>
>> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ch...@apache.org>:
>>
>>> Have you set anything beyond the defaults in the Flink configuration?
>>>
>>> This could just be noise with some Kafka stuff running in the background
>>> while Flink is shutting things down (and closing the classloader).
>>>
>>> On 04/02/2022 15:29, HG wrote:
>>>
>>> Hi,
>>>
>>> I am developing my flink application.
>>> For start I have built a class that reads events from Kafka and outputs
>>> them datastream.print()
>>>
>>> The job runs every time.
>>> But starting with the 2nd time I see this in the standalone session log:
>>>
>>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>>                    [] - Failed to close KafkaClient with type
>>> org.apache.kafka.clients.NetworkClient
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/network/Selector$CloseMode
>>>         at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.kafka.common.network.Selector$CloseMode
>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> ~[?:?]
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>>         ... 6 more
>>> 2022-02-04 15:16:30,802 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>>> closed.
>>>
>>> Am I doing something wrong?
>>>
>>> This is basically the gist of the code:
>>>
>>> KafkaSource<String> source = KafkaSource
>>>         .<String>builder()
>>>         .setBootstrapServers(brokers)
>>> .setGroupId(groupId)
>>> .setTopics(kafkaInputTopic)
>>>         .setValueOnlyDeserializer(new SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>>>         .setBounded(OffsetsInitializer.latest())
>>>         .build();
>>> //withIdleness.duration()//env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>>
>>> ds.print();
>>>
>>>
>>>
>>
>

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

Posted by Chesnay Schepler <ch...@apache.org>.
I think you can safely ignore this warning. It shouldn't cause any harm, 
but I will file a ticket nonetheless.

On 07/02/2022 12:52, HG wrote:
> I have nothing like that in the config (flink-conf.yaml).
>
> Just downloaded the software and did bin/start-cluster.sh
>
> Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ch...@apache.org>:
>
>     I meant in the Flink config of the cluster you are submitting the
>     jobs to.
>     Specifically whether classloader.check-leaked-classloader was set
>     to false.
>
>     On 07/02/2022 10:28, HG wrote:
>>     Hi,
>>
>>     Well I have set :
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setMaxParallelism(5);
>>     env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
>>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     env.enableCheckpointing(500);
>>     On the other hand .setBounded(OffsetsInitializer.latest())
>>     Perhaps that is a bit of a conflict? The job should be unbounded
>>     anyway. When I cancel it (the unbounded) via the GUI and start it
>>     again I do not see the same issue.
>>     So perhaps not very important. Regards Hans
>>
>>
>>     Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler
>>     <ch...@apache.org>:
>>
>>         Have you set anything beyond the defaults in the Flink
>>         configuration?
>>
>>         This could just be noise with some Kafka stuff running in the
>>         background while Flink is shutting things down (and closing
>>         the classloader).
>>
>>         On 04/02/2022 15:29, HG wrote:
>>>         Hi,
>>>
>>>         I am developing my flink application.
>>>         For start I have built a class that reads events from Kafka
>>>         and outputs them datastream.print()
>>>
>>>         The job runs every time.
>>>         But starting with the 2nd time I see this in the standalone
>>>         session log:
>>>
>>>         2022-02-04 15:16:30,801 WARN
>>>          org.apache.kafka.common.utils.Utils                  [] -
>>>         Failed to close KafkaClient with type
>>>         org.apache.kafka.clients.NetworkClient
>>>         java.lang.NoClassDefFoundError:
>>>         org/apache/kafka/common/network/Selector$CloseMode
>>>                 at
>>>         org.apache.kafka.common.network.Selector.close(Selector.java:806)
>>>         ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>                 at
>>>         org.apache.kafka.common.network.Selector.close(Selector.java:365)
>>>         ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>                 at
>>>         org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>>>         ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>                 at
>>>         org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>>>         [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>                 at
>>>         org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>>>         [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>                 at java.lang.Thread.run(Thread.java:829) [?:?]
>>>         Caused by: java.lang.ClassNotFoundException:
>>>         org.apache.kafka.common.network.Selector$CloseMode
>>>                 at
>>>         java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>>         ~[?:?]
>>>                 at
>>>         java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>>                 at
>>>         org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>         ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>                 at
>>>         org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>         ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>                 at
>>>         org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>         ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>                 at
>>>         java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>>                 ... 6 more
>>>         2022-02-04 15:16:30,802 INFO
>>>          org.apache.flink.runtime.source.coordinator.SourceCoordinator
>>>         [] - Source coordinator for source Source: Kafka Source ->
>>>         Sink: Print to Std. Out closed.
>>>
>>>         Am I doing something wrong?
>>>
>>>         This is basically the gist of the code:
>>>
>>>         KafkaSource<String> source = KafkaSource
>>>                  .<String>builder()
>>>                  .setBootstrapServers(brokers)
>>>         .setGroupId(groupId)
>>>         .setTopics(kafkaInputTopic)
>>>                  .setValueOnlyDeserializer(new SimpleStringSchema())
>>>         //.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
>>>         .setStartingOffsets(OffsetsInitializer.earliest())
>>>                  .setBounded(OffsetsInitializer.latest())
>>>                  .build();
>>>
>>>         //withIdleness.duration() //env.fromSource(source,
>>>         WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");
>>>
>>>         ds.print();
>>
>>
>

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

Posted by HG <ha...@gmail.com>.
I have nothing like that in the config (flink-conf.yaml).

Just downloaded the software and did bin/start-cluster.sh

Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ch...@apache.org>:

> I meant in the Flink config of the cluster you are submitting the jobs to.
> Specifically whether classloader.check-leaked-classloader was set to
> false.
>
> On 07/02/2022 10:28, HG wrote:
>
> Hi,
>
> Well I have set :
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setMaxParallelism(5);
> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.enableCheckpointing(500);
>
> On the other hand .setBounded(OffsetsInitializer.latest())
>
> Perhaps that is a bit of a conflict?
> The job should be unbounded anyway.
> When I cancel it  (the unbounded) via the GUI and start it again I do not see the same issue.
>
> So perhaps not very important.
> Regards Hans
>
>
> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ch...@apache.org>:
>
>> Have you set anything beyond the defaults in the Flink configuration?
>>
>> This could just be noise with some Kafka stuff running in the background
>> while Flink is shutting things down (and closing the classloader).
>>
>> On 04/02/2022 15:29, HG wrote:
>>
>> Hi,
>>
>> I am developing my flink application.
>> For start I have built a class that reads events from Kafka and outputs
>> them datastream.print()
>>
>> The job runs every time.
>> But starting with the 2nd time I see this in the standalone session log:
>>
>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>                  [] - Failed to close KafkaClient with type
>> org.apache.kafka.clients.NetworkClient
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/network/Selector$CloseMode
>>         at
>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.network.Selector$CloseMode
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>> ~[?:?]
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>         ... 6 more
>> 2022-02-04 15:16:30,802 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>> closed.
>>
>> Am I doing something wrong?
>>
>> This is basically the gist of the code:
>>
>> KafkaSource<String> source = KafkaSource
>>         .<String>builder()
>>         .setBootstrapServers(brokers)
>> .setGroupId(groupId)
>> .setTopics(kafkaInputTopic)
>>         .setValueOnlyDeserializer(new SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>>         .setBounded(OffsetsInitializer.latest())
>>         .build();
>> //withIdleness.duration()//env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>
>> ds.print();
>>
>>
>>
>

Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/network/Selector$CloseMode

Posted by Chesnay Schepler <ch...@apache.org>.
Have you set anything beyond the defaults in the Flink configuration?

This could just be noise with some Kafka stuff running in the background 
while Flink is shutting things down (and closing the classloader).

On 04/02/2022 15:29, HG wrote:
> Hi,
>
> I am developing my flink application.
> For start I have built a class that reads events from Kafka and 
> outputs them datastream.print()
>
> The job runs every time.
> But starting with the 2nd time I see this in the standalone session log:
>
> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils  [] 
> - Failed to close KafkaClient with type 
> org.apache.kafka.clients.NetworkClient
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/network/Selector$CloseMode
>         at 
> org.apache.kafka.common.network.Selector.close(Selector.java:806) 
> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>         at 
> org.apache.kafka.common.network.Selector.close(Selector.java:365) 
> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>         at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) 
> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>         at 
> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) 
> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>         at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) 
> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.network.Selector$CloseMode
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:476) 
> ~[?:?]
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>         at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) 
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>         at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) 
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>         at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) 
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>         ... 6 more
> 2022-02-04 15:16:30,802 INFO 
>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Source coordinator for source Source: Kafka Source -> Sink: Print to 
> Std. Out closed.
>
> Am I doing something wrong?
>
> This is basically the gist of the code:
>
> KafkaSource<String> source = KafkaSource
>          .<String>builder()
>          .setBootstrapServers(brokers)
> .setGroupId(groupId)
> .setTopics(kafkaInputTopic)
>          .setValueOnlyDeserializer(new SimpleStringSchema())
> //.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)) 
> .setStartingOffsets(OffsetsInitializer.earliest())
>          .setBounded(OffsetsInitializer.latest())
>          .build();
>
> //withIdleness.duration() //env.fromSource(source, 
> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");
>
> ds.print();