You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2018/08/31 06:10:37 UTC

ElasticSearch 6 - error with UpdateRequest

Good day everyone,

I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
error:

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
	at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)

Below is my ElasticsearchSinkFunction:

	import org.elasticsearch.action.update.UpdateRequest
	def upsertRequest(element: T): UpdateRequest = {
		new UpdateRequest(
			"myIndex",
			"record",
			s"${element.id}")
	        	.doc(element.toMap())
	}
	override def process(element: T, runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
		requestIndexer.add(upsertRequest(element))
	}

What could be the issue here?

Thanks for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearch 6 - error with UpdateRequest

Posted by Timo Walther <tw...@apache.org>.
The problem is that BulkProcessorIndexer is located in 
flink-connector-elasticsearch-base which is compiled against a very old 
ES version. This old version is source code compatible but apparently 
not binary compatible with newer Elasticsearch classes. By copying this 
class you force to compile the class against ES 6 and don't use the old 
class in the base module.

The fix will include to improve the API call bridge. As done here [1].

Regards,
Timo

[1] https://github.com/apache/flink/pull/6611


Am 31.08.18 um 09:06 schrieb Averell:
> Hi Timo,
>
> Thanks for your help. I don't get that error anymore after putting that file
> into my project.
> However, I don't understand how it could help. I have been using the Flink
> binary built on my same laptop, then how could it be different between
> having that java class in Flink project vs in my project?
> If you have some spare time, please help explain.
>
> I also would like to know the other way to fix that issue (that you
> implemented in your branch).
>
> Thanks a lot for your help.
> Regards,
> Averell
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ElasticSearch 6 - error with UpdateRequest

Posted by Averell <lv...@gmail.com>.
Hi Timo,

Thanks for your help. I don't get that error anymore after putting that file
into my project.
However, I don't understand how it could help. I have been using the Flink
binary built on my same laptop, then how could it be different between
having that java class in Flink project vs in my project?
If you have some spare time, please help explain.

I also would like to know the other way to fix that issue (that you
implemented in your branch).

Thanks a lot for your help.
Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearch 6 - error with UpdateRequest

Posted by Timo Walther <tw...@apache.org>.
Hi Averell,

sorry for my wrong other mail.

I also observed this issue when implementing FLINK-3875. Currently, 
update requests are broken due to a binary incompatibility. I already 
have a fix for this in a different branch. I opened FLINK-10269 [1] to 
track the issue.

As a work around you can simply copy 
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer 
to your project. This should ensure that the class is compiled 
correctly. If it doesn't help, please let us know.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-10269

Am 31.08.18 um 08:10 schrieb Averell:
> Good day everyone,
>
> I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
> error:
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> 	at
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)
>
> Below is my ElasticsearchSinkFunction:
>
> 	import org.elasticsearch.action.update.UpdateRequest
> 	def upsertRequest(element: T): UpdateRequest = {
> 		new UpdateRequest(
> 			"myIndex",
> 			"record",
> 			s"${element.id}")
> 	        	.doc(element.toMap())
> 	}
> 	override def process(element: T, runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit = {
> 		requestIndexer.add(upsertRequest(element))
> 	}
>
> What could be the issue here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: ElasticSearch 6 - error with UpdateRequest

Posted by Timo Walther <tw...@apache.org>.
Hi,

thanks for your feedback. I agree that the the current interfaces are 
not flexible enough to fit to every use case. The unified connector API 
is a a very recent feature that still needs some polishing. I'm working 
on a design document to improve the situation there.

For now, you can simply implement some utitilty method that just 
iterates over column names and types of TableSchema and calls 
`schema.field(name, type)`

I hope this helps.

Regards,
Timo


Am 31.08.18 um 08:10 schrieb Averell:
> Good day everyone,
>
> I tried to send UpdateRequest(s) to ElasticSearch6, and I got the following
> error:
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> 	at
> org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:76)
>
> Below is my ElasticsearchSinkFunction:
>
> 	import org.elasticsearch.action.update.UpdateRequest
> 	def upsertRequest(element: T): UpdateRequest = {
> 		new UpdateRequest(
> 			"myIndex",
> 			"record",
> 			s"${element.id}")
> 	        	.doc(element.toMap())
> 	}
> 	override def process(element: T, runtimeContext: RuntimeContext,
> requestIndexer: RequestIndexer): Unit = {
> 		requestIndexer.add(upsertRequest(element))
> 	}
>
> What could be the issue here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/