You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chandresh pancholi <ch...@gmail.com> on 2018/03/27 10:15:00 UTC

java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2

@Service
public class FlinkStream {

    @Autowired
    private ClientService clientService;

    @Autowired
    private AppConfig appConfig;

    @PostConstruct
    public void init() {
        List<Client> clientList = clientService.getAllEnableTenant();
        clientList.stream().forEach(client -> {
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

            ConfluentAvroDeserializationSchema schema = new
ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
            Properties properties =
buildKafkaConsumerProperties(client.getTopic());

            FlinkKafkaConsumer011<String> flinkKafkaConsumer = new
FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

            DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
            writeTOEs(kafkaStream, client);
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


    }

    public Properties buildKafkaConsumerProperties(String clientTopic) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
appConfig.getKafkaBootstrapServers() );
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
appConfig.getKafkaFetchMinBytes() );
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
appConfig.getKafkaAutoCommit());
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
appConfig.getKafkaAutoCommitInterval());
        properties.put("specific.avro.reader", true);
        properties.put("schema.registry.url",
appConfig.getKafkaSchemaRegistry());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaValueDeserializer());

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

        return properties;
    }

    public void writeTOEs(DataStream dataStream, Client client) {
        HashMap<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");
        config.put("cluster.name", appConfig.getElasticsearchCluster());

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        for (String tokenizedHost:
appConfig.getElasticsearchHost().split(",")) {
            try {
                transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        dataStream.addSink(new ElasticsearchSink<>(config,
transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);

                return Requests.indexRequest()
                        .index(client.getIndexName())
                        .type(client.getIndexName() + "-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

    }
}




-- 
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.pancholi@flipkart.com
Contact:08951803660

Re: java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

Posted by chandresh pancholi <ch...@gmail.com>.
Hi,

Thank you for the response. I have made the suggested changes But now I am
getting "Caused by: java.lang.NoClassDefFoundError: scala/Product$class"
I am running my application on SpringBoot 2.0 version. Is there better
platform to run Flink Code?

Caused by: java.lang.NoClassDefFoundError: scala/Product$class
    at akka.util.Timeout.<init>(Timeout.scala:13)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:263)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.create(ActorSystem.scala:191)
~[akka-actor_2.11-2.4.20.jar:na]

On Tue, Mar 27, 2018 at 3:54 PM, Chesnay Schepler <ch...@apache.org>
wrote:

> Your anonymous ElasticsearchSinkFunction accesses the client variable that
> is defined outside of the function.
> For the function to be serializable, said Client must be as well.
>
> I suggest to turn your function into a named class with a constructor that
> accepts the indexName.
>
>
> On 27.03.2018 12:15, chandresh pancholi wrote:
>
> Flow
> Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
> ES
> Kafka - Latest version
> Flink : 1.4.2
> ES: 5.5.2
>
> @Servicepublic class FlinkStream {
>
>     @Autowired    private ClientService clientService;
>
>     @Autowired    private AppConfig appConfig;
>
>     @PostConstruct    public void init() {
>         List<Client> clientList = clientService.getAllEnableTenant();
>         clientList.stream().forEach(client -> {
>             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>             ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
>             Properties properties = buildKafkaConsumerProperties(client.getTopic());
>
>             FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
>
>             DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
>             writeTOEs(kafkaStream, client);
>             try {
>                 env.execute();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         });
>
>
>     }
>
>     public Properties buildKafkaConsumerProperties(String clientTopic) {
>         Properties properties = new Properties();
>         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() );
>         properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() );
>         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit());
>         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval());
>         properties.put("specific.avro.reader", true);
>         properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry());
>         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer());
>         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer());
>
>         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
>
>         return properties;
>     }
>
>     public void writeTOEs(DataStream dataStream, Client client) {
>         HashMap<String, String> config = new HashMap<>();
>         config.put("bulk.flush.max.actions", "1");
>         config.put("cluster.name", appConfig.getElasticsearchCluster());
>
>         List<InetSocketAddress> transportAddresses = new ArrayList<>();
>         for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) {
>             try {
>                 transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
>             public IndexRequest createIndexRequest(String element) {
>                 Map<String, String> json = new HashMap<>();
>                 json.put("data", element);
>
>                 return Requests.indexRequest()
>                         .index(client.getIndexName())
>                         .type(client.getIndexName() + "-type")
>                         .source(json);
>             }
>
>             @Override            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
>                 indexer.add(createIndexRequest(element));
>             }
>         }));
>
>     }
> }
>
>
>
>
> --
> Chandresh Pancholi
> Senior Software Engineer
> Flipkart.com
> Email-id:chandresh.pancholi@flipkart.com
> Contact:08951803660
>
>
>


-- 
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.pancholi@flipkart.com
Contact:08951803660

Re: java.lang.IllegalArgumentException: The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

Posted by Chesnay Schepler <ch...@apache.org>.
Your anonymous ElasticsearchSinkFunction accesses the client variable 
that is defined outside of the function.
For the function to be serializable, said Client must be as well.

I suggest to turn your function into a named class with a constructor 
that accepts the indexName.

On 27.03.2018 12:15, chandresh pancholi wrote:
> Flow
> Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> 
> FLink -> ES
> Kafka - Latest version
> Flink : 1.4.2
> ES: 5.5.2
> @Service public class FlinkStream {
>
>      @Autowired private ClientServiceclientService;
>
>      @Autowired private AppConfigappConfig;
>
>      @PostConstruct public void init() {
>          List<Client> clientList =clientService.getAllEnableTenant();
>          clientList.stream().forEach(client -> {
>              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>              ConfluentAvroDeserializationSchema schema =new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
>              Properties properties = buildKafkaConsumerProperties(client.getTopic());
>
>              FlinkKafkaConsumer011<String> flinkKafkaConsumer =new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
>
>              DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
>              writeTOEs(kafkaStream, client);
>              try {
>                  env.execute();
>              }catch (Exception e) {
>                  e.printStackTrace();
>              }
>          });
>
>
>      }
>
>      public Properties buildKafkaConsumerProperties(String clientTopic) {
>          Properties properties =new Properties();
>          properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,appConfig.getKafkaBootstrapServers() );
>          properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,appConfig.getKafkaFetchMinBytes() );
>          properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,appConfig.getKafkaAutoCommit());
>          properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,appConfig.getKafkaAutoCommitInterval());
>          properties.put("specific.avro.reader",true);
>          properties.put("schema.registry.url",appConfig.getKafkaSchemaRegistry());
>          properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaKeyDeserializer());
>          properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaValueDeserializer());
>
>          properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
>
>          return properties;
>      }
>
>      public void writeTOEs(DataStream dataStream, Client client) {
>          HashMap<String, String> config =new HashMap<>();
>          config.put("bulk.flush.max.actions","1");
>          config.put("cluster.name <http://cluster.name>",appConfig.getElasticsearchCluster());
>
>          List<InetSocketAddress> transportAddresses =new ArrayList<>();
>          for (String tokenizedHost:appConfig.getElasticsearchHost().split(",")) {
>              try {
>                  transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost),9300));
>              }catch (UnknownHostException e) {
>                  e.printStackTrace();
>              }
>          }
>
>          dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses,new ElasticsearchSinkFunction<String>() {
>              public IndexRequest createIndexRequest(String element) {
>                  Map<String, String> json =new HashMap<>();
>                  json.put("data", element);
>
>                  return Requests.indexRequest()
>                          .index(client.getIndexName())
>                          .type(client.getIndexName() +"-type")
>                          .source(json);
>              }
>
>              @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
>                  indexer.add(createIndexRequest(element));
>              }
>          }));
>
>      }
> }
>
> *
>
> *
> -- 
> Chandresh Pancholi
> Senior Software Engineer
> Flipkart.com
> Email-id:chandresh.pancholi@flipkart.com 
> <ma...@flipkart.com>
> Contact:08951803660