You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fábio Dias <fa...@gmail.com> on 2017/03/02 14:47:36 UTC
Elasticsearch 5.x connection
Hi,
Last Friday I was running elasticsearch 5.X with Flink 1.2.0
In the pom.xml I added this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
And I added to two files : the ElasticsearchSink.java
and Elasticsearch5ApiCallBridge.java from the flink github.
And this code was running with no problem:
public static void writeToElastic(DataStream<HashMap<String, Object>>
elasticStream) {
HashMap<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "clouduxlogs");
try {
ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new
InetSocketAddress(InetAddress.getByName("localhost"), 9300));
ElasticsearchSinkFunction<HashMap<String, Object>> indexLog =
new ElasticsearchSinkFunction<HashMap<String, Object>>() {
private static final long serialVersionUID = 8802869701292023100L;
public IndexRequest createIndexRequest(HashMap<String, Object> element) {
HashMap<String, HashMap<String,Object>> valueOfLog = new HashMap<>();
element.put("timestamp", (new Timestamp((new
Date()).getTime())).toString());
valueOfLog.put("data", element);
//{aggregation : { aggregationType : "value", "value" : 468, "count" : 1,
"timestamp": } }
return Requests
.indexRequest()
.index("logs")
.type("object")
.source(valueOfLog);
}
public void process(HashMap<String, Object> element, RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
};
SinkFunction<HashMap<String, Object>> esSink = new
ElasticsearchSink<HashMap<String, Object>>(config, transports, indexLog);
elasticStream.addSink(esSink);
}
catch (Exception e) {
System.out.println(e);
}
}
but in monday those files (ElasticsearchSink.java
and Elasticsearch5ApiCallBridge.java) was changed and now my code doesn't
work.
I have tried to use this dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
but I'm getting this error:
java.lang.NoSuchMethodError:
org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
at
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
at
org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:78)
at ux.App.writeToElastic(App.java:102)
at ux.App.main(App.java:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Do I need to downgrade my elastic version or there is some other way to
make it work?
Thanks,
Fábio Dias.
Re: Elasticsearch 5.x connection
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,
java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(
The reason for this exception is because the `isSerializable `method only exists in 1.3-SNAPSHOT of `flink-core` at the moment. These kind of errors can usually be expected to happen if you are using mismatching versions of Flink libraries and core Flink dependencies.
Elasticsearch 5 will be released with Flink 1.3.0 (targeted release time is end of May). For the time being, if Elasticsearch 5 is a must, you could try implementing a copy of the `isSerializable` method under the exact same package path / method and class name in your own project. However, I can not guarantee that this will work as there may be other conflicts.
- Gordon
On March 2, 2017 at 10:47:52 PM, Fábio Dias (fabiodiogoo@gmail.com) wrote:
java.lang.NoSuchMethodError: org.apache.flink.util.InstantiationUtil.isSerializable(Ljava/lang/Object;)Z
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.<init>(ElasticsearchSinkBase.java:195)
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(ElasticsearchSink.java:95)
at org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink.<init>(