You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Raj Kumar <sm...@gmail.com> on 2017/08/30 03:40:54 UTC
Elasticsearch Sink - Error
Hi,
I am using elasticsearch 5.4.3 version in my flink project(flink version
1.3.1)
Details
1. Using Maven build tool.
2. Running from intellij IDE.
3. Elasticsearch is running on the local machine.
Have added the following maven dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
<version>1.3.1</version>
</dependency>
*code added*
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "elasticsearch");
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
alerts.addSink(new ElasticsearchSink<AggResult>(config,
transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
public IndexRequest createIndexRequest(AggResult aggResult){
Map<String, Long> json = new HashMap<>();
json.put("totalCount", aggResult.getTotalCount());
return Requests
.indexRequest()
.index("logdata")
.type("consolidatedStreamData")
.source(json);
}
@Override
public void process(AggResult aggResult, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(aggResult));
}
}));
*This results in the following error.*
Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
at ECSPrototype$2.process(ECSPrototype.java:148)
at ECSPrototype$2.process(ECSPrototype.java:134)
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327)
at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Anyidea what is wrong here ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Elasticsearch Sink - Error
Posted by Fabian Hueske <fh...@gmail.com>.
That's correct Flavio.
The issue has been reported as
https://issues.apache.org/jira/browse/FLINK-7386
Best, Fabian
2017-08-30 9:21 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> I also had problems with ES 5.4.3 and I had to modify the connector
> code...I fear that the code is compatible only up to ES 5.2 or similar..
>
> On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <sm...@gmail.com>
> wrote:
>
>> Hi,
>> I am using elasticsearch 5.4.3 version in my flink project(flink version
>> 1.3.1)
>> Details
>> 1. Using Maven build tool.
>> 2. Running from intellij IDE.
>> 3. Elasticsearch is running on the local machine.
>>
>> Have added the following maven dependency
>>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
>> <version>1.3.1</version>
>> </dependency>
>>
>>
>> *code added*
>>
>> Map<String, String> config = new HashMap<>();
>> config.put("cluster.name", "elasticsearch");
>> config.put("bulk.flush.max.actions", "1");
>>
>> List<InetSocketAddress> transportAddresses = new
>> ArrayList<>();
>> transportAddresses.add(new
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>>
>> alerts.addSink(new ElasticsearchSink<AggResult>(config,
>> transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
>> public IndexRequest createIndexRequest(AggResult
>> aggResult){
>> Map<String, Long> json = new HashMap<>();
>> json.put("totalCount", aggResult.getTotalCount());
>>
>> return Requests
>> .indexRequest()
>> .index("logdata")
>> .type("consolidatedStreamData")
>> .source(json);
>>
>> }
>> @Override
>> public void process(AggResult aggResult, RuntimeContext
>> runtimeContext, RequestIndexer requestIndexer) {
>> requestIndexer.add(createIndexRequest(aggResult));
>> }
>> }));
>>
>>
>>
>> *This results in the following error.*
>>
>> Caused by: java.lang.NoSuchMethodError:
>> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elastic
>> search/action/ActionRequest;)Lorg/elasticsearch/action/
>> bulk/BulkProcessor;
>> at
>> org.apache.flink.streaming.connectors.elasticsearch.BulkProc
>> essorIndexer.add(BulkProcessorIndexer.java:52)
>> at ECSPrototype$2.process(ECSPrototype.java:148)
>> at ECSPrototype$2.process(ECSPrototype.java:134)
>> at
>> org.apache.flink.streaming.connectors.elasticsearch.Elastics
>> earchSinkBase.invoke(ElasticsearchSinkBase.java:282)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processE
>> lement(StreamSink.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:528)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:503)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:575)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:536)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:891)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:869)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollecto
>> r.collect(TimestampedCollector.java:51)
>> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:327)
>> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:303)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperato
>> r.processElement(KeyedProcessOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Anyidea what is wrong here ?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <+39%200461%20182%203908>
>
Re: Elasticsearch Sink - Error
Posted by Flavio Pompermaier <po...@okkam.it>.
I also had problems with ES 5.4.3 and I had to modify the connector
code...I fear that the code is compatible only up to ES 5.2 or similar..
On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <sm...@gmail.com>
wrote:
> Hi,
> I am using elasticsearch 5.4.3 version in my flink project(flink version
> 1.3.1)
> Details
> 1. Using Maven build tool.
> 2. Running from intellij IDE.
> 3. Elasticsearch is running on the local machine.
>
> Have added the following maven dependency
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
> <version>1.3.1</version>
> </dependency>
>
>
> *code added*
>
> Map<String, String> config = new HashMap<>();
> config.put("cluster.name", "elasticsearch");
> config.put("bulk.flush.max.actions", "1");
>
> List<InetSocketAddress> transportAddresses = new ArrayList<>();
> transportAddresses.add(new
> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>
> alerts.addSink(new ElasticsearchSink<AggResult>(config,
> transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
> public IndexRequest createIndexRequest(AggResult
> aggResult){
> Map<String, Long> json = new HashMap<>();
> json.put("totalCount", aggResult.getTotalCount());
>
> return Requests
> .indexRequest()
> .index("logdata")
> .type("consolidatedStreamData")
> .source(json);
>
> }
> @Override
> public void process(AggResult aggResult, RuntimeContext
> runtimeContext, RequestIndexer requestIndexer) {
> requestIndexer.add(createIndexRequest(aggResult));
> }
> }));
>
>
>
> *This results in the following error.*
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/
> ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
> at
> org.apache.flink.streaming.connectors.elasticsearch.
> BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
> at ECSPrototype$2.process(ECSPrototype.java:148)
> at ECSPrototype$2.process(ECSPrototype.java:134)
> at
> org.apache.flink.streaming.connectors.elasticsearch.
> ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
> at
> org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:536)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:869)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:327)
> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:303)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:94)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Anyidea what is wrong here ?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
--
Flavio Pompermaier
Development Department
OKKAM S.r.l.
Tel. +(39) 0461 1823908