You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Vijay Srinivasaraghavan <vi...@yahoo.com.INVALID> on 2017/04/28 02:57:44 UTC

ElasticsearchSink Serialization Error

Hello,
I am seeing below error when I try to use ElasticsearchSink. It complains about serialization and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability (changed from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config, transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization error. This suggests the problem could be with the IndexRequestBuilder implementation but I am not able to move further.
Could someone please let me know what's the right way to use ElasticsearchSink() API? 
Build DetailsFlink 1.2.0Elastic Search 5.3.0

Error Message

org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)        at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)
Code Snippet
``` private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration appConfiguration) throws Exception {
 String host = appConfiguration.getPipeline().getElasticSearch().getHost(); int port = appConfiguration.getPipeline().getElasticSearch().getPort(); String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();
 Map<String, String> config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", cluster);
 List<TransportAddress> transports = new ArrayList<>(); transports.add(new InetSocketTransportAddress(host, port));
 return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration)); }
 public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>, Serializable {
 private String index; private String type; //private transient Gson gson = new Gson();
 public ResultIndexRequestBuilder() {}
 public ResultIndexRequestBuilder(AppConfiguration appConfiguration) { index = appConfiguration.getPipeline().getElasticSearch().getIndex(); type = appConfiguration.getPipeline().getElasticSearch().getType(); }
 @Override public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) { Gson gson = new Gson(); String resultAsJson = gson.toJson(result); System.out.println(resultAsJson); Map<String, String> jsonMap = new HashMap<>(); jsonMap.put("data", resultAsJson);
 return Requests.indexRequest() .index(index) .type(type) .source(jsonMap); }```
RegardsVijay

Re: ElasticsearchSink Serialization Error

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
ResultIndexRequestBuilder is a non-static inner class. This means it has a pointer to the enclosing instance. If you make it a static inner class your code should work.

Best,
Aljoscha
> On 28. Apr 2017, at 04:57, Vijay Srinivasaraghavan <vi...@yahoo.com> wrote:
> 
> Hello,
> 
> I am seeing below error when I try to use ElasticsearchSink. It complains about serialization and looks like it is leading to "IndexRequestBuilder" implementation. I have tried the suggestion as mentioned in http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability <http://stackoverflow.com/questions/33246864/elasticsearch-sink-seralizability> (changed from anonymous class to concrete class) but it did not help. However, when I call "ElasticsearchSink<>(config, transports, null)" by passing "null" for "IndexRequestBuilder" then I don't see the serialization error. This suggests the problem could be with the IndexRequestBuilder implementation but I am not able to move further.
> 
> Could someone please let me know what's the right way to use ElasticsearchSink() API? 
> 
> Build Details
> Flink 1.2.0
> Elastic Search 5.3.0
> 
> 
> Error Message
> 
> 
> org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
>         at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
>         at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1539)
>         at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:161)
>         at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1076)
> 
> Code Snippet
> 
> ```
> 	private ElasticsearchSink<Result>  sinkToElasticSearch(AppConfiguration appConfiguration) throws Exception {
> 
> 		String host = appConfiguration.getPipeline().getElasticSearch().getHost();
> 		int port = appConfiguration.getPipeline().getElasticSearch().getPort();
> 		String cluster = appConfiguration.getPipeline().getElasticSearch().getCluster();
> 
> 		Map<String, String> config = new HashMap<>();
> 		config.put("bulk.flush.max.actions", "1");
> 		config.put("cluster.name", cluster);
> 
> 		List<TransportAddress> transports = new ArrayList<>();
> 		transports.add(new InetSocketTransportAddress(host, port));
> 
> 		return new ElasticsearchSink<>(config, transports, new ResultIndexRequestBuilder(appConfiguration));
> 	}
> 
> 	public class ResultIndexRequestBuilder implements IndexRequestBuilder<Result>, Serializable {
> 
> 		private String index;
> 		private String type;
> 		//private transient Gson gson = new Gson();
> 
> 		public ResultIndexRequestBuilder() {}
> 
> 		public ResultIndexRequestBuilder(AppConfiguration appConfiguration) {
> 			index = appConfiguration.getPipeline().getElasticSearch().getIndex();
> 			type = appConfiguration.getPipeline().getElasticSearch().getType();
> 		}
> 
> 		@Override
> 		public IndexRequest createIndexRequest(Result result, RuntimeContext ctx) {
> 			Gson gson = new Gson();
> 			String resultAsJson = gson.toJson(result);
> 			System.out.println(resultAsJson);
> 			Map<String, String> jsonMap = new HashMap<>();
> 			jsonMap.put("data", resultAsJson);
> 
> 			return Requests.indexRequest()
> 					.index(index)
> 					.type(type)
> 					.source(jsonMap);
> 		}
> ```
> 
> Regards
> Vijay