You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "freeza1982@outlook.com" <fr...@outlook.com> on 2020/10/20 05:48:20 UTC

单任务多条流的逻辑报错

Hi all:

请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class invalid for deserialization
    at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
    at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)

大致逻辑如下, 我有2条流:
1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
代码如下:
 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        List<KafkaInfo> kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
        KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink();
        RecordTransformOperator transformOperator = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
        RecordKeySelector keySelector = new RecordKeySelector();
        RecordComputeOperator computeOperator = new RecordComputeOperator();
        Properties sinkProperties = new Properties();
        sinkProperties.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer());
        FlinkKafkaProducer011 flinkKafkaProducer
                = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties);

        List<SingleOutputStreamOperator<Tuple2<String, String>>> dataStreamList = new ArrayList<>();
        for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
            Properties sourceProperties = new Properties();
            sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer());
            sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
            sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord());
            sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs());
            String topicName = kafkaInfo.getTopicName();
            FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
                    = new FlinkKafkaConsumer011(topicName,
                    new KafkaDeserializer(),
                    sourceProperties);
            SingleOutputStreamOperator<Tuple2<String, String>> singleOutputStreamOperator =
                    streamExecutionEnvironment.addSource(flinkKafkaConsumer);
            dataStreamList.add(singleOutputStreamOperator);
        }

        DataStream<Tuple2<String, String>> unionDataStream = dataStreamList.get(0);
        for(int i = 1; i<dataStreamList.size(); i++) {
            unionDataStream = unionDataStream.union(dataStreamList.get(i));
        }
        unionDataStream.flatMap(transformOperator)
                .keyBy(keySelector)
                .flatMap(computeOperator)
                .addSink(flinkKafkaProducer);

        RecordTransformOperator transformOperator1 = new RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
        Properties sinkProperties1 = new Properties();
        sinkProperties1.setProperty("bootstrap.servers", kafkaSinkConfiguration.getBootstrapServer());
        FlinkKafkaProducer011 flinkKafkaProducer1
                = new FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new KafkaSerializer(), sinkProperties1);
        KafkaInfo kafkaInfo  = this.kafkaConfiguration.getSource1().get(0);
        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", kafkaInfo.getBootstrapServer());
        sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
        sourceProperties.setProperty("max.poll.records", kafkaInfo.getMaxPollRecord());
        sourceProperties.put("max.poll.interval.ms", kafkaInfo.getMaxPollIntervalMs());
        String topicName = kafkaInfo.getTopicName();
        FlinkKafkaConsumer011<Tuple2<String, String>> flinkKafkaConsumer
                = new FlinkKafkaConsumer011(topicName,
                new KafkaDeserializer(),
                sourceProperties);
        streamExecutionEnvironment
                .addSource(flinkKafkaConsumer)
                .flatMap(transformOperator1)
                .addSink(flinkKafkaProducer1);
        streamExecutionEnvironment.execute();



freeza1982@outlook.com

Re: Re: 单任务多条流的逻辑报错

Posted by "freeza1982@outlook.com" <fr...@outlook.com>.
InstantiationUtil这个类我代码中并未显示调用,具体应该如何定位问题原因,烦请看下能否提供下可行的解决方案



freeza1982@outlook.com
 
发件人: Robin Zhang
发送时间: 2020-10-20 14:39
收件人: user-zh
主题: Re: 单任务多条流的逻辑报错
Hi,
   根据报错内容,定位到你的代码在
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
      at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。
 
Best,
Robin
 
 
 
 
freeza1982@outlook.com wrote
> Hi all:
> 
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
>     at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
>     at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
>  StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         List
> <KafkaInfo>
>  kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
>         KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
>         RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
>         RecordKeySelector keySelector = new RecordKeySelector();
>         RecordComputeOperator computeOperator = new
> RecordComputeOperator();
>         Properties sinkProperties = new Properties();
>         sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
> 
>         List&lt;SingleOutputStreamOperator&lt;Tuple2&lt;String,
> String&gt;>> dataStreamList = new ArrayList<>();
>         for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
>             Properties sourceProperties = new Properties();
>             sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
>             sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
>             sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
>             sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
>             String topicName = kafkaInfo.getTopicName();
>             FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                     = new FlinkKafkaConsumer011(topicName,
>                     new KafkaDeserializer(),
>                     sourceProperties);
>             SingleOutputStreamOperator&lt;Tuple2&lt;String, String&gt;>
> singleOutputStreamOperator =
>                    
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
>             dataStreamList.add(singleOutputStreamOperator);
>         }
> 
>         DataStream&lt;Tuple2&lt;String, String&gt;> unionDataStream =
> dataStreamList.get(0);
>         for(int i = 1; i&lt;dataStreamList.size(); i++) {
>             unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
>         }
>         unionDataStream.flatMap(transformOperator)
>                 .keyBy(keySelector)
>                 .flatMap(computeOperator)
>                 .addSink(flinkKafkaProducer);
> 
>         RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
>         Properties sinkProperties1 = new Properties();
>         sinkProperties1.setProperty(&quot;bootstrap.servers&quot;,
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer1
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
>         KafkaInfo kafkaInfo  =
> this.kafkaConfiguration.getSource1().get(0);
>         Properties sourceProperties = new Properties();
>         sourceProperties.setProperty(&quot;bootstrap.servers&quot;,
> kafkaInfo.getBootstrapServer());
>         sourceProperties.setProperty(&quot;group.id&quot;,
> kafkaInfo.getGroupId());
>         sourceProperties.setProperty(&quot;max.poll.records&quot;,
> kafkaInfo.getMaxPollRecord());
>         sourceProperties.put(&quot;max.poll.interval.ms&quot;,
> kafkaInfo.getMaxPollIntervalMs());
>         String topicName = kafkaInfo.getTopicName();
>         FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                 = new FlinkKafkaConsumer011(topicName,
>                 new KafkaDeserializer(),
>                 sourceProperties);
>         streamExecutionEnvironment
>                 .addSource(flinkKafkaConsumer)
>                 .flatMap(transformOperator1)
>                 .addSink(flinkKafkaProducer1);
>         streamExecutionEnvironment.execute();
> 
> 
 
> freeza1982@
 
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 单任务多条流的逻辑报错

Posted by Robin Zhang <vi...@outlook.com>.
Hi,
   根据报错内容,定位到你的代码在
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
      at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
      at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。

Best,
Robin




freeza1982@outlook.com wrote
> Hi all:
> 
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> <init>
> (OperatorChain.java:144)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
>     at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
>     at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
>  StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         List
> <KafkaInfo>
>  kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
>         KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
>         RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
>         RecordKeySelector keySelector = new RecordKeySelector();
>         RecordComputeOperator computeOperator = new
> RecordComputeOperator();
>         Properties sinkProperties = new Properties();
>         sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
> 
>         List&lt;SingleOutputStreamOperator&lt;Tuple2&lt;String,
> String&gt;>> dataStreamList = new ArrayList<>();
>         for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
>             Properties sourceProperties = new Properties();
>             sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
>             sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
>             sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
>             sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
>             String topicName = kafkaInfo.getTopicName();
>             FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                     = new FlinkKafkaConsumer011(topicName,
>                     new KafkaDeserializer(),
>                     sourceProperties);
>             SingleOutputStreamOperator&lt;Tuple2&lt;String, String&gt;>
> singleOutputStreamOperator =
>                    
> streamExecutionEnvironment.addSource(flinkKafkaConsumer);
>             dataStreamList.add(singleOutputStreamOperator);
>         }
> 
>         DataStream&lt;Tuple2&lt;String, String&gt;> unionDataStream =
> dataStreamList.get(0);
>         for(int i = 1; i&lt;dataStreamList.size(); i++) {
>             unionDataStream =
> unionDataStream.union(dataStreamList.get(i));
>         }
>         unionDataStream.flatMap(transformOperator)
>                 .keyBy(keySelector)
>                 .flatMap(computeOperator)
>                 .addSink(flinkKafkaProducer);
> 
>         RecordTransformOperator transformOperator1 = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM);
>         Properties sinkProperties1 = new Properties();
>         sinkProperties1.setProperty(&quot;bootstrap.servers&quot;,
> kafkaSinkConfiguration.getBootstrapServer());
>         FlinkKafkaProducer011 flinkKafkaProducer1
>                 = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties1);
>         KafkaInfo kafkaInfo  =
> this.kafkaConfiguration.getSource1().get(0);
>         Properties sourceProperties = new Properties();
>         sourceProperties.setProperty(&quot;bootstrap.servers&quot;,
> kafkaInfo.getBootstrapServer());
>         sourceProperties.setProperty(&quot;group.id&quot;,
> kafkaInfo.getGroupId());
>         sourceProperties.setProperty(&quot;max.poll.records&quot;,
> kafkaInfo.getMaxPollRecord());
>         sourceProperties.put(&quot;max.poll.interval.ms&quot;,
> kafkaInfo.getMaxPollIntervalMs());
>         String topicName = kafkaInfo.getTopicName();
>         FlinkKafkaConsumer011&lt;Tuple2&lt;String, String&gt;>
> flinkKafkaConsumer
>                 = new FlinkKafkaConsumer011(topicName,
>                 new KafkaDeserializer(),
>                 sourceProperties);
>         streamExecutionEnvironment
>                 .addSource(flinkKafkaConsumer)
>                 .flatMap(transformOperator1)
>                 .addSink(flinkKafkaProducer1);
>         streamExecutionEnvironment.execute();
> 
> 

> freeza1982@





--
Sent from: http://apache-flink.147419.n8.nabble.com/