You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/10/22 07:24:04 UTC

Flink - Kafka topic null error; happens only when running on cluster

Hi,
I am trying to write some data to a kafka topic and I have the following
situation:

monitorStateStream

   .process(new
IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)

   *... // Stream that outputs elements of type IDAP2Alarm*

.addSink(getFlinkKafkaProducer(ALARMS_KAFKA, Config.ALARMS_TOPIC)).name(
ALARM_SINK).uid(ALARM_SINK);

private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
getFlinkKafkaProducer(String servers, String topic) {
   Properties properties = new Properties();
   properties.setProperty("bootstrap.servers", servers);
   return new FlinkKafkaProducer<T>(topic,
         (element, timestamp) -> element.serializeForKafka(),
         properties,
         FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

/*
    This interface is used to indicate that a class may be output to
Kafka. Since Kafka treats all
    data as bytes, classes that implement this interface have to
provide an implementation for the
    serializeForKafka() method.
 */
public interface IDAP2JSONOutput {

    // Implement serialization logic in this method.
    ProducerRecord<byte[], byte[]> serializeForKafka();

}

public class IDAP2Alarm extends Tuple5<...> implements  IDAP2JSONOutput{

private final Logger LOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);

@Override
public ProducerRecord<byte[], byte[]> serializeForKafka() {
    byte[] rawValue;
    byte[] rawKey;
    String k = getMonitorFeatureKey().getMonitorName() ;
    ...

    rawValue = val.getBytes();

    LOGGER.info("value of alarms topic from idap2 alarm : " +
Config.ALARMS_TOPIC);

    return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
rawValue); // Line 95
}

}


Config.ALARMS_TOPIC is a static string that is read from a properties file.
When I run this code on my IDE minicluster, it runs great with no problems.
But when I submit it as a jar to the cluster, I get the following error:

Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(
ProducerRecord.java:71) ~[flink_POC-0.1.jar:?]
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(
ProducerRecord.java:133) ~[flink_POC-0.1.jar:?]
*    at
flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
~[flink_POC-0.1.jar:?]*
    at flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(
StreamingJob.java:62) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:854) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99) ~[flink_POC-0.1.jar:?]
    at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:56) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:53) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    *at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
~[flink_POC-0.1.jar:?]*
*    at
flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
~[flink_POC-0.1.jar:?]*
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator
.processElement(KeyedProcessOperator.java:85) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.0
.jar:1.11.0]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.0.jar:
1.11.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.0.jar:1.11
.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]

Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the LOGGER
statement in IDAP2Alarm above is never printed when running on Flink
cluster. In order to verify if the correct value of Config.ALARM_TOPIC is
read from configuration file, I printed it from Config class - and it
prints correctly. So my questions are:

   - Why does this work on a minicluster but not when submitted as a jar to
   a normal cluster? I am using Flink v1.11.0 in both my POM file and the
   cluster runtime.
   - Why does the LOGGER line not get printed even though execution
   definitely reached it (as seen from the stacktrace)?

Thank you,
Manas Kale

Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Manas Kale <ma...@gmail.com>.
Hi Timo,
Sure, I have opened this <https://issues.apache.org/jira/browse/FLINK-19807>
issue on Jira.

On Fri, Oct 23, 2020 at 4:09 PM Timo Walther <tw...@apache.org> wrote:

> Hi Manas,
>
> that is a good point. Feel free to open an issue for this. It is not the
> first time that your question appeared on the mailing list.
>
> Regards,
> Timo
>
> On 23.10.20 07:22, Manas Kale wrote:
> > Hi Timo,
> > I figured it out, thanks a lot for your help.
> > Are there any articles detailing the pre-flight and cluster phases? I
> > couldn't find anything on ci.apache.org/projects/flink
> > <http://ci.apache.org/projects/flink> and I think this behaviour should
> > be documented as a warning/note.
> >
> >
> > On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Manas,
> >
> >     you can use static variable but you need to make sure that the logic
> to
> >     fill the static variable is accessible and executed in all JVMs.
> >
> >     I assume `pipeline.properties` is in your JAR that you submit to the
> >     cluster right? Then you should be able to access it through a
> singleton
> >     pattern instead of a static variable access.
> >
> >     Regards,
> >     Timo
> >
> >
> >     On 22.10.20 14:17, Manas Kale wrote:
> >      > Sorry, I messed up the code snippet in the earlier mail. The
> >     correct one
> >      > is :
> >      >
> >      > public static void main(String[] args) {
> >      >         Properties prop =new Properties();
> >      >
> >      > InputStream is =
> >
>  Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >      > prop.load(is);
> >      >
> >      > HashMap<String, String> strMap =new HashMap<>();
> >      >
> >      > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >      >
> >      > new Config(strMap);
> >      >
> >      > ...
> >      >
> >      > }
> >      >
> >      > public class Config {
> >      >
> >      > public static StringCONFIG_TOPIC;
> >      >
> >      > publicConfig(HashMap<String, String> s) {
> >      >
> >      >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >      >
> >      > }
> >      >
> >      > }
> >      >
> >      > The value of CONFIG_TOPIC in a minicluster is properly loaded but
> >     null
> >      > when run on a cluster.
> >      >
> >      >
> >      > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskale96@gmail.com
> >     <ma...@gmail.com>
> >      > <mailto:manaskale96@gmail.com <ma...@gmail.com>>>
> wrote:
> >      >
> >      >     Hi Timo,
> >      >     Thank you for the explanation, I can start to see why I was
> >     getting
> >      >     an exception.
> >      >     Are you saying that I cannot use static variables at all when
> >     trying
> >      >     to deploy to a cluster? I would like the variables to remain
> >     static
> >      >     and not be instance-bound as they are accessed from multiple
> >     classes.
> >      >     Based on my understanding of what you said, I implemented the
> >      >     following pattern:
> >      >
> >      >     public static void main(String[] args) {
> >      >             Properties prop =new Properties();
> >      >
> >      >     InputStream is =
> >
>  Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >      >     prop.load(is);
> >      >
> >      >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >      >
> >      >     new Config(strMap, longMap);
> >      >
> >      >     ...
> >      >
> >      >     }
> >      >
> >      >     public class Config {
> >      >
> >      >     public static StringCONFIG_TOPIC;
> >      >     public static StringCONFIG_KAFKA;
> >      >
> >      >     public Config(HashMap<String, String> s) {
> >      >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >      >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >      >
> >      >     }
> >      >
> >      >     }
> >      >
> >      >     This produces the same issue. With the easier solution that
> you
> >      >     listed, are you implying I use multiple instances or a
> singleton
> >      >     pattern of some sort?
> >      >
> >      >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther
> >     <twalthr@apache.org <ma...@apache.org>
> >      >     <mailto:twalthr@apache.org <ma...@apache.org>>>
> wrote:
> >      >
> >      >         Hi Manas,
> >      >
> >      >         you need to make sure to differentiate between what Flink
> >     calls
> >      >         "pre-flight phase" and "cluster phase".
> >      >
> >      >         The pre-flight phase is were the pipeline is constructed
> >     and all
> >      >         functions are instantiated. They are then later
> >     serialized and
> >      >         send to
> >      >         the cluster.
> >      >
> >      >         If you are reading your properties file in the `main()`
> >     method
> >      >         and store
> >      >         something in static variables, the content is available
> >     locally
> >      >         where
> >      >         the pipeline is constructed (e.g. in the client) but when
> the
> >      >         function
> >      >         instances are send to the cluster. Those static variables
> >     are fresh
> >      >         (thus empty) in the cluster JVMs. You need to either make
> >     sure
> >      >         that the
> >      >         properties file is read from each task manager again, or
> >     easier:
> >      >         pass
> >      >         the parameters as constructor parameters into the
> >     instances such
> >      >         that
> >      >         they are shipped together with the function itself.
> >      >
> >      >         I hope this helps.
> >      >
> >      >         Regards,
> >      >         Timo
> >      >
> >      >
> >      >         On 22.10.20 09:24, Manas Kale wrote:
> >      >          > Hi,
> >      >          > I am trying to write some data to a kafka topic and I
> have
> >      >         the following
> >      >          > situation:
> >      >          >
> >      >          > monitorStateStream
> >      >          >
> >      >          >     .process(new
> >      >
> >
>  IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >      >          >
> >      >          >     /... // Stream that outputs elements of type
> >     IDAP2Alarm/
> >      >          >
> >      >          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> >      >          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >      >          >
> >      >          > private static <T extends IDAP2JSONOutput>
> >      >         FlinkKafkaProducer<T> getFlinkKafkaProducer(String
> servers,
> >      >         String topic) {
> >      >          >     Properties properties =new Properties();
> >      >          >     properties.setProperty("bootstrap.servers",
> servers);
> >      >          >     return new FlinkKafkaProducer<T>(topic,
> >      >          >           (element, timestamp) ->
> >     element.serializeForKafka(),
> >      >          >           properties,
> >      >          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >      >          > }
> >      >          >
> >      >          > /*
> >      >          > This interface is used to indicate that a class may be
> >     output
> >      >         to Kafka.
> >      >          > Since Kafka treats all
> >      >          > data as bytes, classes that implement this interface
> >     have to
> >      >         provide an
> >      >          > implementation for the
> >      >          > serializeForKafka() method.
> >      >          > */
> >      >          > public interface IDAP2JSONOutput {
> >      >          >
> >      >          >      // Implement serialization logic in this method.
> >      >          > ProducerRecord<byte[],byte[]> serializeForKafka();
> >      >          >
> >      >          > }
> >      >          >
> >      >          > public class IDAP2Alarmextends Tuple5<...>implements
> >      >         IDAP2JSONOutput{
> >      >          >
> >      >          > private final LoggerLOGGER =
> >      >         LoggerFactory.getLogger(IDAP2Alarm.class);
> >      >          >
> >      >          > @Override
> >      >          > public ProducerRecord<byte[],byte[]>
> serializeForKafka() {
> >      >          >      byte[] rawValue;
> >      >          >      byte[] rawKey;
> >      >          >      String k =
> getMonitorFeatureKey().getMonitorName() ;
> >      >          >      ...
> >      >          >
> >      >          >      rawValue = val.getBytes();
> >      >          >
> >      >          >      LOGGER.info("value of alarms topic from idap2
> >     alarm : " +
> >      >          > Config.ALARMS_TOPIC);
> >      >          >
> >      >          > return new ProducerRecord<>(Config.ALARMS_TOPIC,
> rawKey,
> >      >         rawValue); // Line 95
> >      >          > }
> >      >          >
> >      >          > }
> >      >          >
> >      >          >
> >      >          > Config.ALARMS_TOPIC is a static string that is read
> from a
> >      >         properties
> >      >          > file. When I run this code on my IDE minicluster, it
> runs
> >      >         great with no
> >      >          > problems. But when I submit it as a jar to the
> >     cluster, I get
> >      >         the
> >      >          > following error:
> >      >          >
> >      >          > Caused by: java.lang.IllegalArgumentException: Topic
> >     cannot
> >      >         be null.
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]
> >      >          > *    at
> >      >          >
> >      >
> >
>  flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]*
> >      >          >      at
> >      >          >
> >      >
> >
>  flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          > *at
> >      >          >
> >      >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]*
> >      >          > *    at
> >      >          >
> >      >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> >      >
> >      >          > ~[flink_POC-0.1.jar:?]*
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at org.apache.flink.streaming.runtime.io
> >     <http://org.apache.flink.streaming.runtime.io>
> >      >         <http://org.apache.flink.streaming.runtime.io>
> >      >          >
> >      >
> >       <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at org.apache.flink.streaming.runtime.io
> >     <http://org.apache.flink.streaming.runtime.io>
> >      >         <http://org.apache.flink.streaming.runtime.io>
> >      >          >
> >      >
> >       <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at org.apache.flink.streaming.runtime.io
> >     <http://org.apache.flink.streaming.runtime.io>
> >      >         <http://org.apache.flink.streaming.runtime.io>
> >      >          >
> >      >
> >       <http://org.apache.flink.streaming.runtime.io
> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >          >
> >      >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> >      >
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >
> >       org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at
> >      >
>  org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> >      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      >          >      at java.lang.Thread.run(Thread.java:748)
> >     ~[?:1.8.0_242]
> >      >          >
> >      >          > Apparently Config.ALARM_TOPIC is being evaluated as
> null.
> >      >         Also, the
> >      >          > LOGGER statement in IDAP2Alarm above is never printed
> when
> >      >         running on
> >      >          > Flink cluster. In order to verify if the correct value
> of
> >      >          > Config.ALARM_TOPIC is read from configuration file, I
> >     printed
> >      >         it from
> >      >          > Config class - and it prints correctly. So my
> >     questions are:
> >      >          >
> >      >          >   * Why does this work on a minicluster but not when
> >      >         submitted as a jar
> >      >          >     to a normal cluster? I am using Flink v1.11.0 in
> >     both my
> >      >         POM file
> >      >          >     and the cluster runtime.
> >      >          >   * Why does the LOGGER line not get printed even
> though
> >      >         execution
> >      >          >     definitely reached it (as seen from the
> stacktrace)?
> >      >          >
> >      >          > Thank you,
> >      >          > Manas Kale
> >      >
> >
>
>

Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Timo Walther <tw...@apache.org>.
Hi Manas,

that is a good point. Feel free to open an issue for this. It is not the 
first time that your question appeared on the mailing list.

Regards,
Timo

On 23.10.20 07:22, Manas Kale wrote:
> Hi Timo,
> I figured it out, thanks a lot for your help.
> Are there any articles detailing the pre-flight and cluster phases? I 
> couldn't find anything on ci.apache.org/projects/flink 
> <http://ci.apache.org/projects/flink> and I think this behaviour should 
> be documented as a warning/note.
> 
> 
> On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Manas,
> 
>     you can use static variable but you need to make sure that the logic to
>     fill the static variable is accessible and executed in all JVMs.
> 
>     I assume `pipeline.properties` is in your JAR that you submit to the
>     cluster right? Then you should be able to access it through a singleton
>     pattern instead of a static variable access.
> 
>     Regards,
>     Timo
> 
> 
>     On 22.10.20 14:17, Manas Kale wrote:
>      > Sorry, I messed up the code snippet in the earlier mail. The
>     correct one
>      > is :
>      >
>      > public static void main(String[] args) {
>      >         Properties prop =new Properties();
>      >
>      > InputStream is =
>     Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
>      > prop.load(is);
>      >
>      > HashMap<String, String> strMap =new HashMap<>();
>      >
>      > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>      >
>      > new Config(strMap);
>      >
>      > ...
>      >
>      > }
>      >
>      > public class Config {
>      >
>      > public static StringCONFIG_TOPIC;
>      >
>      > publicConfig(HashMap<String, String> s) {
>      >
>      >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>      >
>      > }
>      >
>      > }
>      >
>      > The value of CONFIG_TOPIC in a minicluster is properly loaded but
>     null
>      > when run on a cluster.
>      >
>      >
>      > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskale96@gmail.com
>     <ma...@gmail.com>
>      > <mailto:manaskale96@gmail.com <ma...@gmail.com>>> wrote:
>      >
>      >     Hi Timo,
>      >     Thank you for the explanation, I can start to see why I was
>     getting
>      >     an exception.
>      >     Are you saying that I cannot use static variables at all when
>     trying
>      >     to deploy to a cluster? I would like the variables to remain
>     static
>      >     and not be instance-bound as they are accessed from multiple
>     classes.
>      >     Based on my understanding of what you said, I implemented the
>      >     following pattern:
>      >
>      >     public static void main(String[] args) {
>      >             Properties prop =new Properties();
>      >
>      >     InputStream is =
>     Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
>      >     prop.load(is);
>      >
>      >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>      >
>      >     new Config(strMap, longMap);
>      >
>      >     ...
>      >
>      >     }
>      >
>      >     public class Config {
>      >
>      >     public static StringCONFIG_TOPIC;
>      >     public static StringCONFIG_KAFKA;
>      >
>      >     public Config(HashMap<String, String> s) {
>      >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>      >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>      >
>      >     }
>      >
>      >     }
>      >
>      >     This produces the same issue. With the easier solution that you
>      >     listed, are you implying I use multiple instances or a singleton
>      >     pattern of some sort?
>      >
>      >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther
>     <twalthr@apache.org <ma...@apache.org>
>      >     <mailto:twalthr@apache.org <ma...@apache.org>>> wrote:
>      >
>      >         Hi Manas,
>      >
>      >         you need to make sure to differentiate between what Flink
>     calls
>      >         "pre-flight phase" and "cluster phase".
>      >
>      >         The pre-flight phase is were the pipeline is constructed
>     and all
>      >         functions are instantiated. They are then later
>     serialized and
>      >         send to
>      >         the cluster.
>      >
>      >         If you are reading your properties file in the `main()`
>     method
>      >         and store
>      >         something in static variables, the content is available
>     locally
>      >         where
>      >         the pipeline is constructed (e.g. in the client) but when the
>      >         function
>      >         instances are send to the cluster. Those static variables
>     are fresh
>      >         (thus empty) in the cluster JVMs. You need to either make
>     sure
>      >         that the
>      >         properties file is read from each task manager again, or
>     easier:
>      >         pass
>      >         the parameters as constructor parameters into the
>     instances such
>      >         that
>      >         they are shipped together with the function itself.
>      >
>      >         I hope this helps.
>      >
>      >         Regards,
>      >         Timo
>      >
>      >
>      >         On 22.10.20 09:24, Manas Kale wrote:
>      >          > Hi,
>      >          > I am trying to write some data to a kafka topic and I have
>      >         the following
>      >          > situation:
>      >          >
>      >          > monitorStateStream
>      >          >
>      >          >     .process(new
>      >       
>       IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>      >          >
>      >          >     /... // Stream that outputs elements of type
>     IDAP2Alarm/
>      >          >
>      >          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>      >          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>      >          >
>      >          > private static <T extends IDAP2JSONOutput>
>      >         FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
>      >         String topic) {
>      >          >     Properties properties =new Properties();
>      >          >     properties.setProperty("bootstrap.servers", servers);
>      >          >     return new FlinkKafkaProducer<T>(topic,
>      >          >           (element, timestamp) ->
>     element.serializeForKafka(),
>      >          >           properties,
>      >          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>      >          > }
>      >          >
>      >          > /*
>      >          > This interface is used to indicate that a class may be
>     output
>      >         to Kafka.
>      >          > Since Kafka treats all
>      >          > data as bytes, classes that implement this interface
>     have to
>      >         provide an
>      >          > implementation for the
>      >          > serializeForKafka() method.
>      >          > */
>      >          > public interface IDAP2JSONOutput {
>      >          >
>      >          >      // Implement serialization logic in this method.
>      >          > ProducerRecord<byte[],byte[]> serializeForKafka();
>      >          >
>      >          > }
>      >          >
>      >          > public class IDAP2Alarmextends Tuple5<...>implements
>      >         IDAP2JSONOutput{
>      >          >
>      >          > private final LoggerLOGGER =
>      >         LoggerFactory.getLogger(IDAP2Alarm.class);
>      >          >
>      >          > @Override
>      >          > public ProducerRecord<byte[],byte[]> serializeForKafka() {
>      >          >      byte[] rawValue;
>      >          >      byte[] rawKey;
>      >          >      String k = getMonitorFeatureKey().getMonitorName() ;
>      >          >      ...
>      >          >
>      >          >      rawValue = val.getBytes();
>      >          >
>      >          >      LOGGER.info("value of alarms topic from idap2
>     alarm : " +
>      >          > Config.ALARMS_TOPIC);
>      >          >
>      >          > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
>      >         rawValue); // Line 95
>      >          > }
>      >          >
>      >          > }
>      >          >
>      >          >
>      >          > Config.ALARMS_TOPIC is a static string that is read from a
>      >         properties
>      >          > file. When I run this code on my IDE minicluster, it runs
>      >         great with no
>      >          > problems. But when I submit it as a jar to the
>     cluster, I get
>      >         the
>      >          > following error:
>      >          >
>      >          > Caused by: java.lang.IllegalArgumentException: Topic
>     cannot
>      >         be null.
>      >          >      at
>      >          >
>      >       
>       org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>      >
>      >          > ~[flink_POC-0.1.jar:?]
>      >          >      at
>      >          >
>      >       
>       org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>      >
>      >          > ~[flink_POC-0.1.jar:?]
>      >          > *    at
>      >          >
>      >       
>       flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
>      >
>      >          > ~[flink_POC-0.1.jar:?]*
>      >          >      at
>      >          >
>      >       
>       flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>      >
>      >          > ~[flink_POC-0.1.jar:?]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>      >
>      >          > ~[flink_POC-0.1.jar:?]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>      >
>      >          > ~[flink_POC-0.1.jar:?]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          > *at
>      >          >
>      >       
>       flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
>      >
>      >          > ~[flink_POC-0.1.jar:?]*
>      >          > *    at
>      >          >
>      >       
>       flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
>      >
>      >          > ~[flink_POC-0.1.jar:?]*
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>      >         <http://org.apache.flink.streaming.runtime.io>
>      >          >
>      >       
>       <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>      >         <http://org.apache.flink.streaming.runtime.io>
>      >          >
>      >       
>       <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at org.apache.flink.streaming.runtime.io
>     <http://org.apache.flink.streaming.runtime.io>
>      >         <http://org.apache.flink.streaming.runtime.io>
>      >          >
>      >       
>       <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >          >
>      >       
>       org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>      >
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >       
>       org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at
>      >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>      >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      >          >      at java.lang.Thread.run(Thread.java:748)
>     ~[?:1.8.0_242]
>      >          >
>      >          > Apparently Config.ALARM_TOPIC is being evaluated as null.
>      >         Also, the
>      >          > LOGGER statement in IDAP2Alarm above is never printed when
>      >         running on
>      >          > Flink cluster. In order to verify if the correct value of
>      >          > Config.ALARM_TOPIC is read from configuration file, I
>     printed
>      >         it from
>      >          > Config class - and it prints correctly. So my
>     questions are:
>      >          >
>      >          >   * Why does this work on a minicluster but not when
>      >         submitted as a jar
>      >          >     to a normal cluster? I am using Flink v1.11.0 in
>     both my
>      >         POM file
>      >          >     and the cluster runtime.
>      >          >   * Why does the LOGGER line not get printed even though
>      >         execution
>      >          >     definitely reached it (as seen from the stacktrace)?
>      >          >
>      >          > Thank you,
>      >          > Manas Kale
>      >
> 


Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Manas Kale <ma...@gmail.com>.
Hi Timo,
I figured it out, thanks a lot for your help.
Are there any articles detailing the pre-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink and I think this
behaviour should be documented as a warning/note.


On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <tw...@apache.org> wrote:

> Hi Manas,
>
> you can use static variable but you need to make sure that the logic to
> fill the static variable is accessible and executed in all JVMs.
>
> I assume `pipeline.properties` is in your JAR that you submit to the
> cluster right? Then you should be able to access it through a singleton
> pattern instead of a static variable access.
>
> Regards,
> Timo
>
>
> On 22.10.20 14:17, Manas Kale wrote:
> > Sorry, I messed up the code snippet in the earlier mail. The correct one
> > is :
> >
> > public static void main(String[] args) {
> >         Properties prop =new Properties();
> >
> > InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> > prop.load(is);
> >
> > HashMap<String, String> strMap =new HashMap<>();
> >
> > strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> > new Config(strMap);
> >
> > ...
> >
> > }
> >
> > public class Config {
> >
> > public static StringCONFIG_TOPIC;
> >
> > publicConfig(HashMap<String, String> s) {
> >
> >      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >
> > }
> >
> > }
> >
> > The value of CONFIG_TOPIC in a minicluster is properly loaded but null
> > when run on a cluster.
> >
> >
> > On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskale96@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     Hi Timo,
> >     Thank you for the explanation, I can start to see why I was getting
> >     an exception.
> >     Are you saying that I cannot use static variables at all when trying
> >     to deploy to a cluster? I would like the variables to remain static
> >     and not be instance-bound as they are accessed from multiple classes.
> >     Based on my understanding of what you said, I implemented the
> >     following pattern:
> >
> >     public static void main(String[] args) {
> >             Properties prop =new Properties();
> >
> >     InputStream is =
> Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> >     prop.load(is);
> >
> >     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> >
> >     new Config(strMap, longMap);
> >
> >     ...
> >
> >     }
> >
> >     public class Config {
> >
> >     public static StringCONFIG_TOPIC;
> >     public static StringCONFIG_KAFKA;
> >
> >     public Config(HashMap<String, String> s) {
> >          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> >          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> >
> >     }
> >
> >     }
> >
> >     This produces the same issue. With the easier solution that you
> >     listed, are you implying I use multiple instances or a singleton
> >     pattern of some sort?
> >
> >     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twalthr@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         Hi Manas,
> >
> >         you need to make sure to differentiate between what Flink calls
> >         "pre-flight phase" and "cluster phase".
> >
> >         The pre-flight phase is were the pipeline is constructed and all
> >         functions are instantiated. They are then later serialized and
> >         send to
> >         the cluster.
> >
> >         If you are reading your properties file in the `main()` method
> >         and store
> >         something in static variables, the content is available locally
> >         where
> >         the pipeline is constructed (e.g. in the client) but when the
> >         function
> >         instances are send to the cluster. Those static variables are
> fresh
> >         (thus empty) in the cluster JVMs. You need to either make sure
> >         that the
> >         properties file is read from each task manager again, or easier:
> >         pass
> >         the parameters as constructor parameters into the instances such
> >         that
> >         they are shipped together with the function itself.
> >
> >         I hope this helps.
> >
> >         Regards,
> >         Timo
> >
> >
> >         On 22.10.20 09:24, Manas Kale wrote:
> >          > Hi,
> >          > I am trying to write some data to a kafka topic and I have
> >         the following
> >          > situation:
> >          >
> >          > monitorStateStream
> >          >
> >          >     .process(new
> >
>  IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >          >
> >          >     /... // Stream that outputs elements of type IDAP2Alarm/
> >          >
> >          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> >          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >          >
> >          > private static <T extends IDAP2JSONOutput>
> >         FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
> >         String topic) {
> >          >     Properties properties =new Properties();
> >          >     properties.setProperty("bootstrap.servers", servers);
> >          >     return new FlinkKafkaProducer<T>(topic,
> >          >           (element, timestamp) -> element.serializeForKafka(),
> >          >           properties,
> >          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >          > }
> >          >
> >          > /*
> >          > This interface is used to indicate that a class may be output
> >         to Kafka.
> >          > Since Kafka treats all
> >          > data as bytes, classes that implement this interface have to
> >         provide an
> >          > implementation for the
> >          > serializeForKafka() method.
> >          > */
> >          > public interface IDAP2JSONOutput {
> >          >
> >          >      // Implement serialization logic in this method.
> >          > ProducerRecord<byte[],byte[]> serializeForKafka();
> >          >
> >          > }
> >          >
> >          > public class IDAP2Alarmextends Tuple5<...>implements
> >         IDAP2JSONOutput{
> >          >
> >          > private final LoggerLOGGER =
> >         LoggerFactory.getLogger(IDAP2Alarm.class);
> >          >
> >          > @Override
> >          > public ProducerRecord<byte[],byte[]> serializeForKafka() {
> >          >      byte[] rawValue;
> >          >      byte[] rawKey;
> >          >      String k = getMonitorFeatureKey().getMonitorName() ;
> >          >      ...
> >          >
> >          >      rawValue = val.getBytes();
> >          >
> >          >      LOGGER.info("value of alarms topic from idap2 alarm : " +
> >          > Config.ALARMS_TOPIC);
> >          >
> >          > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
> >         rawValue); // Line 95
> >          > }
> >          >
> >          > }
> >          >
> >          >
> >          > Config.ALARMS_TOPIC is a static string that is read from a
> >         properties
> >          > file. When I run this code on my IDE minicluster, it runs
> >         great with no
> >          > problems. But when I submit it as a jar to the cluster, I get
> >         the
> >          > following error:
> >          >
> >          > Caused by: java.lang.IllegalArgumentException: Topic cannot
> >         be null.
> >          >      at
> >          >
> >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          > *    at
> >          >
> >
>  flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          >      at
> >          >
> >
>  flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> >
> >          > ~[flink_POC-0.1.jar:?]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          > *at
> >          >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          > *    at
> >          >
> >
>  flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> >
> >          > ~[flink_POC-0.1.jar:?]*
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at org.apache.flink.streaming.runtime.io
> >         <http://org.apache.flink.streaming.runtime.io>
> >          >
> >         <http://org.apache.flink.streaming.runtime.io
> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >          >
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> >
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >         org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at
> >         org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> >          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >          >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
> >          >
> >          > Apparently Config.ALARM_TOPIC is being evaluated as null.
> >         Also, the
> >          > LOGGER statement in IDAP2Alarm above is never printed when
> >         running on
> >          > Flink cluster. In order to verify if the correct value of
> >          > Config.ALARM_TOPIC is read from configuration file, I printed
> >         it from
> >          > Config class - and it prints correctly. So my questions are:
> >          >
> >          >   * Why does this work on a minicluster but not when
> >         submitted as a jar
> >          >     to a normal cluster? I am using Flink v1.11.0 in both my
> >         POM file
> >          >     and the cluster runtime.
> >          >   * Why does the LOGGER line not get printed even though
> >         execution
> >          >     definitely reached it (as seen from the stacktrace)?
> >          >
> >          > Thank you,
> >          > Manas Kale
> >
>
>

Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Timo Walther <tw...@apache.org>.
Hi Manas,

you can use static variable but you need to make sure that the logic to 
fill the static variable is accessible and executed in all JVMs.

I assume `pipeline.properties` is in your JAR that you submit to the 
cluster right? Then you should be able to access it through a singleton 
pattern instead of a static variable access.

Regards,
Timo


On 22.10.20 14:17, Manas Kale wrote:
> Sorry, I messed up the code snippet in the earlier mail. The correct one 
> is :
> 
> public static void main(String[] args) {
>         Properties prop =new Properties();
> 
> InputStream is = Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
> 
> HashMap<String, String> strMap =new HashMap<>();
> 
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> 
> new Config(strMap);
> 
> ...
> 
> }
> 
> public class Config {
> 
> public static StringCONFIG_TOPIC;
> 
> publicConfig(HashMap<String, String> s) {
> 
>      CONFIG_TOPIC = s.get("CONFIG_TOPIC");
> 
> }
> 
> }
> 
> The value of CONFIG_TOPIC in a minicluster is properly loaded but null 
> when run on a cluster.
> 
> 
> On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <manaskale96@gmail.com 
> <ma...@gmail.com>> wrote:
> 
>     Hi Timo,
>     Thank you for the explanation, I can start to see why I was getting
>     an exception.
>     Are you saying that I cannot use static variables at all when trying
>     to deploy to a cluster? I would like the variables to remain static
>     and not be instance-bound as they are accessed from multiple classes.
>     Based on my understanding of what you said, I implemented the
>     following pattern:
> 
>     public static void main(String[] args) {
>             Properties prop =new Properties();
> 
>     InputStream is = Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
>     prop.load(is);
> 
>     strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
> 
>     new Config(strMap, longMap);
> 
>     ...
> 
>     }
> 
>     public class Config {
> 
>     public static StringCONFIG_TOPIC;
>     public static StringCONFIG_KAFKA;
> 
>     public Config(HashMap<String, String> s) {
>          CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>          CONFIG_KAFKA = s.get("CONFIG_KAFKA");
> 
>     }
> 
>     }
> 
>     This produces the same issue. With the easier solution that you
>     listed, are you implying I use multiple instances or a singleton
>     pattern of some sort?
> 
>     On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <twalthr@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Hi Manas,
> 
>         you need to make sure to differentiate between what Flink calls
>         "pre-flight phase" and "cluster phase".
> 
>         The pre-flight phase is were the pipeline is constructed and all
>         functions are instantiated. They are then later serialized and
>         send to
>         the cluster.
> 
>         If you are reading your properties file in the `main()` method
>         and store
>         something in static variables, the content is available locally
>         where
>         the pipeline is constructed (e.g. in the client) but when the
>         function
>         instances are send to the cluster. Those static variables are fresh
>         (thus empty) in the cluster JVMs. You need to either make sure
>         that the
>         properties file is read from each task manager again, or easier:
>         pass
>         the parameters as constructor parameters into the instances such
>         that
>         they are shipped together with the function itself.
> 
>         I hope this helps.
> 
>         Regards,
>         Timo
> 
> 
>         On 22.10.20 09:24, Manas Kale wrote:
>          > Hi,
>          > I am trying to write some data to a kafka topic and I have
>         the following
>          > situation:
>          >
>          > monitorStateStream
>          >
>          >     .process(new
>         IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>          >
>          >     /... // Stream that outputs elements of type IDAP2Alarm/
>          >
>          > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>          > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>          >
>          > private static <T extends IDAP2JSONOutput>
>         FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers,
>         String topic) {
>          >     Properties properties =new Properties();
>          >     properties.setProperty("bootstrap.servers", servers);
>          >     return new FlinkKafkaProducer<T>(topic,
>          >           (element, timestamp) -> element.serializeForKafka(),
>          >           properties,
>          >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>          > }
>          >
>          > /*
>          > This interface is used to indicate that a class may be output
>         to Kafka.
>          > Since Kafka treats all
>          > data as bytes, classes that implement this interface have to
>         provide an
>          > implementation for the
>          > serializeForKafka() method.
>          > */
>          > public interface IDAP2JSONOutput {
>          >
>          >      // Implement serialization logic in this method.
>          > ProducerRecord<byte[],byte[]> serializeForKafka();
>          >
>          > }
>          >
>          > public class IDAP2Alarmextends Tuple5<...>implements
>         IDAP2JSONOutput{
>          >
>          > private final LoggerLOGGER =
>         LoggerFactory.getLogger(IDAP2Alarm.class);
>          >
>          > @Override
>          > public ProducerRecord<byte[],byte[]> serializeForKafka() {
>          >      byte[] rawValue;
>          >      byte[] rawKey;
>          >      String k = getMonitorFeatureKey().getMonitorName() ;
>          >      ...
>          >
>          >      rawValue = val.getBytes();
>          >
>          >      LOGGER.info("value of alarms topic from idap2 alarm : " +
>          > Config.ALARMS_TOPIC);
>          >
>          > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey,
>         rawValue); // Line 95
>          > }
>          >
>          > }
>          >
>          >
>          > Config.ALARMS_TOPIC is a static string that is read from a
>         properties
>          > file. When I run this code on my IDE minicluster, it runs
>         great with no
>          > problems. But when I submit it as a jar to the cluster, I get
>         the
>          > following error:
>          >
>          > Caused by: java.lang.IllegalArgumentException: Topic cannot
>         be null.
>          >      at
>          >
>         org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
> 
>          > ~[flink_POC-0.1.jar:?]
>          >      at
>          >
>         org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
> 
>          > ~[flink_POC-0.1.jar:?]
>          > *    at
>          >
>         flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> 
>          > ~[flink_POC-0.1.jar:?]*
>          >      at
>          >
>         flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
> 
>          > ~[flink_POC-0.1.jar:?]
>          >      at
>          >
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
> 
>          > ~[flink_POC-0.1.jar:?]
>          >      at
>          >
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> 
>          > ~[flink_POC-0.1.jar:?]
>          >      at
>          >
>         org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          > *at
>          >
>         flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> 
>          > ~[flink_POC-0.1.jar:?]*
>          > *    at
>          >
>         flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> 
>          > ~[flink_POC-0.1.jar:?]*
>          >      at
>          >
>         org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at org.apache.flink.streaming.runtime.io
>         <http://org.apache.flink.streaming.runtime.io>
>          >
>         <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at org.apache.flink.streaming.runtime.io
>         <http://org.apache.flink.streaming.runtime.io>
>          >
>         <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at org.apache.flink.streaming.runtime.io
>         <http://org.apache.flink.streaming.runtime.io>
>          >
>         <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>          >
>         org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> 
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>         org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at
>         org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>          > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>          >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
>          >
>          > Apparently Config.ALARM_TOPIC is being evaluated as null.
>         Also, the
>          > LOGGER statement in IDAP2Alarm above is never printed when
>         running on
>          > Flink cluster. In order to verify if the correct value of
>          > Config.ALARM_TOPIC is read from configuration file, I printed
>         it from
>          > Config class - and it prints correctly. So my questions are:
>          >
>          >   * Why does this work on a minicluster but not when
>         submitted as a jar
>          >     to a normal cluster? I am using Flink v1.11.0 in both my
>         POM file
>          >     and the cluster runtime.
>          >   * Why does the LOGGER line not get printed even though
>         execution
>          >     definitely reached it (as seen from the stacktrace)?
>          >
>          > Thank you,
>          > Manas Kale
> 


Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Manas Kale <ma...@gmail.com>.
Sorry, I messed up the code snippet in the earlier mail. The correct one is
:

public static void main(String[] args) {
       Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

HashMap<String, String> strMap = new HashMap<>();

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap);

...

}

public class Config {

public static String CONFIG_TOPIC;

public Config(HashMap<String, String> s) {

    CONFIG_TOPIC = s.get("CONFIG_TOPIC");

}

}

The value of CONFIG_TOPIC in a minicluster is properly loaded but null when
run on a cluster.


On Thu, Oct 22, 2020 at 5:42 PM Manas Kale <ma...@gmail.com> wrote:

> Hi Timo,
> Thank you for the explanation, I can start to see why I was getting an
> exception.
> Are you saying that I cannot use static variables at all when trying to
> deploy to a cluster? I would like the variables to remain static and not be
> instance-bound as they are accessed from multiple classes.
> Based on my understanding of what you said, I implemented the
> following pattern:
>
> public static void main(String[] args) {
>        Properties prop = new Properties();
>
> InputStream is = Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
> prop.load(is);
>
> strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));
>
> new Config(strMap, longMap);
>
> ...
>
> }
>
> public class Config {
>
> public static String CONFIG_TOPIC;
> public static String CONFIG_KAFKA;
>
> public Config(HashMap<String, String> s) {
>     CONFIG_TOPIC = s.get("CONFIG_TOPIC");
>     CONFIG_KAFKA = s.get("CONFIG_KAFKA");
>
> }
>
> }
>
> This produces the same issue. With the easier solution that you listed,
> are you implying I use multiple instances or a singleton pattern of some
> sort?
>
> On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Manas,
>>
>> you need to make sure to differentiate between what Flink calls
>> "pre-flight phase" and "cluster phase".
>>
>> The pre-flight phase is were the pipeline is constructed and all
>> functions are instantiated. They are then later serialized and send to
>> the cluster.
>>
>> If you are reading your properties file in the `main()` method and store
>> something in static variables, the content is available locally where
>> the pipeline is constructed (e.g. in the client) but when the function
>> instances are send to the cluster. Those static variables are fresh
>> (thus empty) in the cluster JVMs. You need to either make sure that the
>> properties file is read from each task manager again, or easier: pass
>> the parameters as constructor parameters into the instances such that
>> they are shipped together with the function itself.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 22.10.20 09:24, Manas Kale wrote:
>> > Hi,
>> > I am trying to write some data to a kafka topic and I have the
>> following
>> > situation:
>> >
>> > monitorStateStream
>> >
>> >     .process(new
>> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
>> >
>> >     /... // Stream that outputs elements of type IDAP2Alarm/
>> >
>> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
>> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
>> >
>> > private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
>> getFlinkKafkaProducer(String servers, String topic) {
>> >     Properties properties =new Properties();
>> >     properties.setProperty("bootstrap.servers", servers);
>> >     return new FlinkKafkaProducer<T>(topic,
>> >           (element, timestamp) -> element.serializeForKafka(),
>> >           properties,
>> >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> > }
>> >
>> > /*
>> > This interface is used to indicate that a class may be output to Kafka.
>> > Since Kafka treats all
>> > data as bytes, classes that implement this interface have to provide an
>> > implementation for the
>> > serializeForKafka() method.
>> > */
>> > public interface IDAP2JSONOutput {
>> >
>> >      // Implement serialization logic in this method.
>> > ProducerRecord<byte[],byte[]> serializeForKafka();
>> >
>> > }
>> >
>> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
>> >
>> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
>> >
>> > @Override
>> > public ProducerRecord<byte[],byte[]> serializeForKafka() {
>> >      byte[] rawValue;
>> >      byte[] rawKey;
>> >      String k = getMonitorFeatureKey().getMonitorName() ;
>> >      ...
>> >
>> >      rawValue = val.getBytes();
>> >
>> >      LOGGER.info("value of alarms topic from idap2 alarm : " +
>> > Config.ALARMS_TOPIC);
>> >
>> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
>> Line 95
>> > }
>> >
>> > }
>> >
>> >
>> > Config.ALARMS_TOPIC is a static string that is read from a properties
>> > file. When I run this code on my IDE minicluster, it runs great with no
>> > problems. But when I submit it as a jar to the cluster, I get the
>> > following error:
>> >
>> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
>> >      at
>> >
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>>
>> > ~[flink_POC-0.1.jar:?]
>> > *    at
>> >
>> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
>> > ~[flink_POC-0.1.jar:?]*
>> >      at
>> >
>> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>>
>> > ~[flink_POC-0.1.jar:?]
>> >      at
>> >
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> > *at
>> >
>> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
>> > ~[flink_POC-0.1.jar:?]*
>> > *    at
>> >
>> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
>> > ~[flink_POC-0.1.jar:?]*
>> >      at
>> >
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.streaming.runtime.io
>> > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at
>> >
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>>
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
>> >
>> > Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the
>> > LOGGER statement in IDAP2Alarm above is never printed when running on
>> > Flink cluster. In order to verify if the correct value of
>> > Config.ALARM_TOPIC is read from configuration file, I printed it from
>> > Config class - and it prints correctly. So my questions are:
>> >
>> >   * Why does this work on a minicluster but not when submitted as a jar
>> >     to a normal cluster? I am using Flink v1.11.0 in both my POM file
>> >     and the cluster runtime.
>> >   * Why does the LOGGER line not get printed even though execution
>> >     definitely reached it (as seen from the stacktrace)?
>> >
>> > Thank you,
>> > Manas Kale
>>
>>

Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Manas Kale <ma...@gmail.com>.
Hi Timo,
Thank you for the explanation, I can start to see why I was getting an
exception.
Are you saying that I cannot use static variables at all when trying to
deploy to a cluster? I would like the variables to remain static and not be
instance-bound as they are accessed from multiple classes.
Based on my understanding of what you said, I implemented the
following pattern:

public static void main(String[] args) {
       Properties prop = new Properties();

InputStream is =
Config.class.getClassLoader().getResourceAsStream("pipeline.properties");
prop.load(is);

strMap.put("CONFIG_TOPIC", prop.getProperty("CONFIG_TOPIC"));

new Config(strMap, longMap);

...

}

public class Config {

public static String CONFIG_TOPIC;
public static String CONFIG_KAFKA;

public Config(HashMap<String, String> s) {
    CONFIG_TOPIC = s.get("CONFIG_TOPIC");
    CONFIG_KAFKA = s.get("CONFIG_KAFKA");

}

}

This produces the same issue. With the easier solution that you listed, are
you implying I use multiple instances or a singleton pattern of some sort?

On Thu, Oct 22, 2020 at 1:23 PM Timo Walther <tw...@apache.org> wrote:

> Hi Manas,
>
> you need to make sure to differentiate between what Flink calls
> "pre-flight phase" and "cluster phase".
>
> The pre-flight phase is were the pipeline is constructed and all
> functions are instantiated. They are then later serialized and send to
> the cluster.
>
> If you are reading your properties file in the `main()` method and store
> something in static variables, the content is available locally where
> the pipeline is constructed (e.g. in the client) but when the function
> instances are send to the cluster. Those static variables are fresh
> (thus empty) in the cluster JVMs. You need to either make sure that the
> properties file is read from each task manager again, or easier: pass
> the parameters as constructor parameters into the instances such that
> they are shipped together with the function itself.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 22.10.20 09:24, Manas Kale wrote:
> > Hi,
> > I am trying to write some data to a kafka topic and I have the following
> > situation:
> >
> > monitorStateStream
> >
> >     .process(new
> IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> >
> >     /... // Stream that outputs elements of type IDAP2Alarm/
> >
> > .addSink(getFlinkKafkaProducer(ALARMS_KAFKA,
> > Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> >
> > private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T>
> getFlinkKafkaProducer(String servers, String topic) {
> >     Properties properties =new Properties();
> >     properties.setProperty("bootstrap.servers", servers);
> >     return new FlinkKafkaProducer<T>(topic,
> >           (element, timestamp) -> element.serializeForKafka(),
> >           properties,
> >           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> > }
> >
> > /*
> > This interface is used to indicate that a class may be output to Kafka.
> > Since Kafka treats all
> > data as bytes, classes that implement this interface have to provide an
> > implementation for the
> > serializeForKafka() method.
> > */
> > public interface IDAP2JSONOutput {
> >
> >      // Implement serialization logic in this method.
> > ProducerRecord<byte[],byte[]> serializeForKafka();
> >
> > }
> >
> > public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
> >
> > private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
> >
> > @Override
> > public ProducerRecord<byte[],byte[]> serializeForKafka() {
> >      byte[] rawValue;
> >      byte[] rawKey;
> >      String k = getMonitorFeatureKey().getMonitorName() ;
> >      ...
> >
> >      rawValue = val.getBytes();
> >
> >      LOGGER.info("value of alarms topic from idap2 alarm : " +
> > Config.ALARMS_TOPIC);
> >
> > return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); //
> Line 95
> > }
> >
> > }
> >
> >
> > Config.ALARMS_TOPIC is a static string that is read from a properties
> > file. When I run this code on my IDE minicluster, it runs great with no
> > problems. But when I submit it as a jar to the cluster, I get the
> > following error:
> >
> > Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
> >      at
> >
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133)
>
> > ~[flink_POC-0.1.jar:?]
> > *    at
> >
> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95)
> > ~[flink_POC-0.1.jar:?]*
> >      at
> >
> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>
> > ~[flink_POC-0.1.jar:?]
> >      at
> >
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> > *at
> >
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69)
> > ~[flink_POC-0.1.jar:?]*
> > *    at
> >
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25)
> > ~[flink_POC-0.1.jar:?]*
> >      at
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.streaming.runtime.io
> > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> > ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> >      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
> >
> > Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the
> > LOGGER statement in IDAP2Alarm above is never printed when running on
> > Flink cluster. In order to verify if the correct value of
> > Config.ALARM_TOPIC is read from configuration file, I printed it from
> > Config class - and it prints correctly. So my questions are:
> >
> >   * Why does this work on a minicluster but not when submitted as a jar
> >     to a normal cluster? I am using Flink v1.11.0 in both my POM file
> >     and the cluster runtime.
> >   * Why does the LOGGER line not get printed even though execution
> >     definitely reached it (as seen from the stacktrace)?
> >
> > Thank you,
> > Manas Kale
>
>

Re: Flink - Kafka topic null error; happens only when running on cluster

Posted by Timo Walther <tw...@apache.org>.
Hi Manas,

you need to make sure to differentiate between what Flink calls 
"pre-flight phase" and "cluster phase".

The pre-flight phase is were the pipeline is constructed and all 
functions are instantiated. They are then later serialized and send to 
the cluster.

If you are reading your properties file in the `main()` method and store 
something in static variables, the content is available locally where 
the pipeline is constructed (e.g. in the client) but when the function 
instances are send to the cluster. Those static variables are fresh 
(thus empty) in the cluster JVMs. You need to either make sure that the 
properties file is read from each task manager again, or easier: pass 
the parameters as constructor parameters into the instances such that 
they are shipped together with the function itself.

I hope this helps.

Regards,
Timo


On 22.10.20 09:24, Manas Kale wrote:
> Hi,
> I am trying to write some data to a kafka topic and I have the following 
> situation:
> 
> monitorStateStream
> 
>     .process(new IDAP2AlarmEmitter()).name(IDAP2_ALARM_EMITTER).uid(IDAP2_ALARM_EMITTER)
> 
>     /... // Stream that outputs elements of type IDAP2Alarm/
> 
> .addSink(getFlinkKafkaProducer(ALARMS_KAFKA, 
> Config.ALARMS_TOPIC)).name(ALARM_SINK).uid(ALARM_SINK);
> 
> private static <T extends IDAP2JSONOutput> FlinkKafkaProducer<T> getFlinkKafkaProducer(String servers, String topic) {
>     Properties properties =new Properties();
>     properties.setProperty("bootstrap.servers", servers);
>     return new FlinkKafkaProducer<T>(topic,
>           (element, timestamp) -> element.serializeForKafka(),
>           properties,
>           FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> }
> 
> /*
> This interface is used to indicate that a class may be output to Kafka. 
> Since Kafka treats all
> data as bytes, classes that implement this interface have to provide an 
> implementation for the
> serializeForKafka() method.
> */
> public interface IDAP2JSONOutput {
> 
>      // Implement serialization logic in this method.
> ProducerRecord<byte[],byte[]> serializeForKafka();
> 
> }
> 
> public class IDAP2Alarmextends Tuple5<...>implements IDAP2JSONOutput{
> 
> private final LoggerLOGGER = LoggerFactory.getLogger(IDAP2Alarm.class);
> 
> @Override
> public ProducerRecord<byte[],byte[]> serializeForKafka() {
>      byte[] rawValue;
>      byte[] rawKey;
>      String k = getMonitorFeatureKey().getMonitorName() ;
>      ...
> 
>      rawValue = val.getBytes();
> 
>      LOGGER.info("value of alarms topic from idap2 alarm : " + 
> Config.ALARMS_TOPIC);
> 
> return new ProducerRecord<>(Config.ALARMS_TOPIC, rawKey, rawValue); // Line 95
> }
> 
> }
> 
> 
> Config.ALARMS_TOPIC is a static string that is read from a properties 
> file. When I run this code on my IDE minicluster, it runs great with no 
> problems. But when I submit it as a jar to the cluster, I get the 
> following error:
> 
> Caused by: java.lang.IllegalArgumentException: Topic cannot be null.
>      at 
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:71) 
> ~[flink_POC-0.1.jar:?]
>      at 
> org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:133) 
> ~[flink_POC-0.1.jar:?]
> *    at 
> flink_POC.idap2.containers.IDAP2Alarm.serializeForKafka(IDAP2Alarm.java:95) 
> ~[flink_POC-0.1.jar:?]*
>      at 
> flink_POC.StreamingJob.lambda$getFlinkKafkaProducer$af2c9cb2$1(StreamingJob.java:62) 
> ~[flink_POC-0.1.jar:?]
>      at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:854) 
> ~[flink_POC-0.1.jar:?]
>      at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) 
> ~[flink_POC-0.1.jar:?]
>      at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> *at 
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:69) 
> ~[flink_POC-0.1.jar:?]*
> *    at 
> flink_POC.idap2.IDAP2AlarmEmitter.processElement(IDAP2AlarmEmitter.java:25) 
> ~[flink_POC-0.1.jar:?]*
>      at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at org.apache.flink.streaming.runtime.io 
> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
> 
> Apparently Config.ALARM_TOPIC is being evaluated as null. Also, the 
> LOGGER statement in IDAP2Alarm above is never printed when running on 
> Flink cluster. In order to verify if the correct value of 
> Config.ALARM_TOPIC is read from configuration file, I printed it from 
> Config class - and it prints correctly. So my questions are:
> 
>   * Why does this work on a minicluster but not when submitted as a jar
>     to a normal cluster? I am using Flink v1.11.0 in both my POM file
>     and the cluster runtime.
>   * Why does the LOGGER line not get printed even though execution
>     definitely reached it (as seen from the stacktrace)?
> 
> Thank you,
> Manas Kale