You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by chandresh pancholi <ch...@gmail.com> on 2018/03/27 10:15:00 UTC
java.lang.IllegalArgumentException: The implementation of the
provided ElasticsearchSinkFunction is not serializable. The object probably
contains or references non-serializable fields.
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service
public class FlinkStream {
@Autowired
private ClientService clientService;
@Autowired
private AppConfig appConfig;
@PostConstruct
public void init() {
List<Client> clientList = clientService.getAllEnableTenant();
clientList.stream().forEach(client -> {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ConfluentAvroDeserializationSchema schema = new
ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
Properties properties =
buildKafkaConsumerProperties(client.getTopic());
FlinkKafkaConsumer011<String> flinkKafkaConsumer = new
FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
writeTOEs(kafkaStream, client);
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public Properties buildKafkaConsumerProperties(String clientTopic) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
appConfig.getKafkaBootstrapServers() );
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
appConfig.getKafkaFetchMinBytes() );
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
appConfig.getKafkaAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
appConfig.getKafkaAutoCommitInterval());
properties.put("specific.avro.reader", true);
properties.put("schema.registry.url",
appConfig.getKafkaSchemaRegistry());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaKeyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaValueDeserializer());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
return properties;
}
public void writeTOEs(DataStream dataStream, Client client) {
HashMap<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", appConfig.getElasticsearchCluster());
List<InetSocketAddress> transportAddresses = new ArrayList<>();
for (String tokenizedHost:
appConfig.getElasticsearchHost().split(",")) {
try {
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
dataStream.addSink(new ElasticsearchSink<>(config,
transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index(client.getIndexName())
.type(client.getIndexName() + "-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
}
}
--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.pancholi@flipkart.com
Contact:08951803660
Re: java.lang.IllegalArgumentException: The implementation of the
provided ElasticsearchSinkFunction is not serializable. The object probably
contains or references non-serializable fields.
Posted by chandresh pancholi <ch...@gmail.com>.
Hi,
Thank you for the response. I have made the suggested changes But now I am
getting "Caused by: java.lang.NoClassDefFoundError: scala/Product$class"
I am running my application on SpringBoot 2.0 version. Is there better
platform to run Flink Code?
Caused by: java.lang.NoClassDefFoundError: scala/Product$class
at akka.util.Timeout.<init>(Timeout.scala:13)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystem$.apply(ActorSystem.scala:263)
~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorSystem$.create(ActorSystem.scala:191)
~[akka-actor_2.11-2.4.20.jar:na]
On Tue, Mar 27, 2018 at 3:54 PM, Chesnay Schepler <ch...@apache.org>
wrote:
> Your anonymous ElasticsearchSinkFunction accesses the client variable that
> is defined outside of the function.
> For the function to be serializable, said Client must be as well.
>
> I suggest to turn your function into a named class with a constructor that
> accepts the indexName.
>
>
> On 27.03.2018 12:15, chandresh pancholi wrote:
>
> Flow
> Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
> ES
> Kafka - Latest version
> Flink : 1.4.2
> ES: 5.5.2
>
> @Servicepublic class FlinkStream {
>
> @Autowired private ClientService clientService;
>
> @Autowired private AppConfig appConfig;
>
> @PostConstruct public void init() {
> List<Client> clientList = clientService.getAllEnableTenant();
> clientList.stream().forEach(client -> {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
> Properties properties = buildKafkaConsumerProperties(client.getTopic());
>
> FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
>
> DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
> writeTOEs(kafkaStream, client);
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> });
>
>
> }
>
> public Properties buildKafkaConsumerProperties(String clientTopic) {
> Properties properties = new Properties();
> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() );
> properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() );
> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit());
> properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval());
> properties.put("specific.avro.reader", true);
> properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry());
> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer());
> properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer());
>
> properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
>
> return properties;
> }
>
> public void writeTOEs(DataStream dataStream, Client client) {
> HashMap<String, String> config = new HashMap<>();
> config.put("bulk.flush.max.actions", "1");
> config.put("cluster.name", appConfig.getElasticsearchCluster());
>
> List<InetSocketAddress> transportAddresses = new ArrayList<>();
> for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) {
> try {
> transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
> } catch (UnknownHostException e) {
> e.printStackTrace();
> }
> }
>
> dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
> public IndexRequest createIndexRequest(String element) {
> Map<String, String> json = new HashMap<>();
> json.put("data", element);
>
> return Requests.indexRequest()
> .index(client.getIndexName())
> .type(client.getIndexName() + "-type")
> .source(json);
> }
>
> @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
> indexer.add(createIndexRequest(element));
> }
> }));
>
> }
> }
>
>
>
>
> --
> Chandresh Pancholi
> Senior Software Engineer
> Flipkart.com
> Email-id:chandresh.pancholi@flipkart.com
> Contact:08951803660
>
>
>
--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.pancholi@flipkart.com
Contact:08951803660
Re: java.lang.IllegalArgumentException: The implementation of the
provided ElasticsearchSinkFunction is not serializable. The object probably
contains or references non-serializable fields.
Posted by Chesnay Schepler <ch...@apache.org>.
Your anonymous ElasticsearchSinkFunction accesses the client variable
that is defined outside of the function.
For the function to be serializable, said Client must be as well.
I suggest to turn your function into a named class with a constructor
that accepts the indexName.
On 27.03.2018 12:15, chandresh pancholi wrote:
> Flow
> Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser ->
> FLink -> ES
> Kafka - Latest version
> Flink : 1.4.2
> ES: 5.5.2
> @Service public class FlinkStream {
>
> @Autowired private ClientServiceclientService;
>
> @Autowired private AppConfigappConfig;
>
> @PostConstruct public void init() {
> List<Client> clientList =clientService.getAllEnableTenant();
> clientList.stream().forEach(client -> {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> ConfluentAvroDeserializationSchema schema =new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
> Properties properties = buildKafkaConsumerProperties(client.getTopic());
>
> FlinkKafkaConsumer011<String> flinkKafkaConsumer =new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
>
> DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
> writeTOEs(kafkaStream, client);
> try {
> env.execute();
> }catch (Exception e) {
> e.printStackTrace();
> }
> });
>
>
> }
>
> public Properties buildKafkaConsumerProperties(String clientTopic) {
> Properties properties =new Properties();
> properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,appConfig.getKafkaBootstrapServers() );
> properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,appConfig.getKafkaFetchMinBytes() );
> properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,appConfig.getKafkaAutoCommit());
> properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,appConfig.getKafkaAutoCommitInterval());
> properties.put("specific.avro.reader",true);
> properties.put("schema.registry.url",appConfig.getKafkaSchemaRegistry());
> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaKeyDeserializer());
> properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaValueDeserializer());
>
> properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
>
> return properties;
> }
>
> public void writeTOEs(DataStream dataStream, Client client) {
> HashMap<String, String> config =new HashMap<>();
> config.put("bulk.flush.max.actions","1");
> config.put("cluster.name <http://cluster.name>",appConfig.getElasticsearchCluster());
>
> List<InetSocketAddress> transportAddresses =new ArrayList<>();
> for (String tokenizedHost:appConfig.getElasticsearchHost().split(",")) {
> try {
> transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost),9300));
> }catch (UnknownHostException e) {
> e.printStackTrace();
> }
> }
>
> dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses,new ElasticsearchSinkFunction<String>() {
> public IndexRequest createIndexRequest(String element) {
> Map<String, String> json =new HashMap<>();
> json.put("data", element);
>
> return Requests.indexRequest()
> .index(client.getIndexName())
> .type(client.getIndexName() +"-type")
> .source(json);
> }
>
> @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
> indexer.add(createIndexRequest(element));
> }
> }));
>
> }
> }
>
> *
>
> *
> --
> Chandresh Pancholi
> Senior Software Engineer
> Flipkart.com
> Email-id:chandresh.pancholi@flipkart.com
> <ma...@flipkart.com>
> Contact:08951803660