You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/08/21 17:36:04 UTC

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1040

    [FLINK-2558] Add Streaming Connector for Elasticsearch

    This adds an ITCase, an example and documentation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink elastic-search

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1040
    
----
commit 19501de53b3a0366fd68a1ad00636ad0ba7e13aa
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-08-21T12:06:04Z

    [FLINK-2558] Add Streaming Connector for Elasticsearch

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/1040


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by HungUnicorn <gi...@git.apache.org>.
Github user HungUnicorn commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1040#discussion_r55013666
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java ---
    @@ -0,0 +1,315 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.elasticsearch;
    +
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkProcessor;
    +import org.elasticsearch.action.bulk.BulkRequest;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.cluster.node.DiscoveryNode;
    +import org.elasticsearch.common.collect.ImmutableList;
    +import org.elasticsearch.common.settings.ImmutableSettings;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.TransportAddress;
    +import org.elasticsearch.common.unit.ByteSizeUnit;
    +import org.elasticsearch.common.unit.ByteSizeValue;
    +import org.elasticsearch.common.unit.TimeValue;
    +import org.elasticsearch.node.Node;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
    +
    +
    +/**
    + * Sink that emits its input elements to an Elasticsearch cluster.
    + *
    + * <p>
    + * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
    + * the sink will create a local {@link Node} for communicating with the
    + * Elasticsearch cluster. When using the second constructor
    + * {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} a {@link TransportClient} will
    + * be used instead.
    + *
    + * <p>
    + * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
    + * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
    + * to come online.
    + *
    + * <p>
    + * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
    + * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
    + * documentation. An important setting is {@code cluster.name}, this should be set to the name
    + * of the cluster that the sink should emit to.
    + *
    + * <p>
    + * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
    + * This will buffer elements before sending a request to the cluster. The behaviour of the
    + * {@code BulkProcessor} can be configured using these config keys:
    + * <ul>
    + *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
    + *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
    + *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
    + *   settings in milliseconds
    + * </ul>
    + *
    + * <p>
    + * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
    + * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
    + * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
    + *
    + * @param <T> Type of the elements emitted by this sink
    + */
    +public class ElasticsearchSink<T> extends RichSinkFunction<T> {
    +
    +	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
    +	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
    +	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
    +
    +	/**
    +	 * The user specified config map that we forward to Elasticsearch when we create the Client.
    +	 */
    +	private final Map<String, String> userConfig;
    +
    +	/**
    +	 * The list of nodes that the TransportClient should connect to. This is null if we are using
    +	 * an embedded Node to get a Client.
    +	 */
    +	private final List<TransportAddress> transportNodes;
    --- End diff --
    
    can I ask would it need to be `transient` rather than `final`? I encounter the issue that it's not serializable when working on `elasticsearch2 connector`. However, making it `transient` in open() the transportNodes is always `null` though not totally sure `transient` is the reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1040#discussion_r37697052
  
    --- Diff: docs/apis/streaming_guide.md ---
    @@ -1517,11 +1517,24 @@ Stream connectors
     ----------------
     
     <!-- TODO: reintroduce flume -->
    -Connectors provide an interface for accessing data from various third party sources (message queues). Currently three connectors are natively supported, namely [Apache Kafka](https://kafka.apache.org/),  [RabbitMQ](http://www.rabbitmq.com/) and the [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis).
    +Connectors provide code for interfacing with various third-party systems.
    +Currently these systems are supported:
     
    -Typically the connector packages consist of a source and sink class (with the exception of Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization schemas for the connectors for the desired types. (Or use some predefined ones)
    + * [Apache Kafka](https://kafka.apache.org/)
    --- End diff --
    
    Should we add source/sink, to show which types of connectors are im`plemented for which systems?
    
    We can also add Java/Scala collections and file systems (HDFS) to that list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1040#issuecomment-133465114
  
    I'm currently in the process of setting up a cluster to test it with an actual Elasticsearch cluster. Once I have the results of that I'll post them here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1040#issuecomment-133718325
  
    Added two minor comments on the docs/interface, but treat those as optional, they are and matter of taste anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1040#discussion_r37698000
  
    --- Diff: docs/apis/streaming_guide.md ---
    @@ -1661,6 +1674,165 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html
     
     [Back to top](#top)
     
    +### Elasticsearch
    +
    +This connector provides a Sink that can write to an
    +[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
    +following dependency to your project:
    +
    +{% highlight xml %}
    +<dependency>
    +  <groupId>org.apache.flink</groupId>
    +  <artifactId>flink-connector-elasticsearch</artifactId>
    +  <version>{{site.version }}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Note that the streaming connectors are currently not part of the binary
    +distribution. See
    +[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
    +for information about how to package the program with the libraries for
    +cluster execution.
    +
    +#### Installing Elasticsearch
    +
    +Instructions for setting up an Elasticsearch cluster can be found
    +[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
    +Make sure to set and remember a cluster name. This must be set when
    +creating a Sink for writing to your cluster
    +
    +#### Elasticsearch Sink
    +The connector provides a Sink that can send data to an Elasticsearch Index.
    +
    +The sink can use two different methods for communicating with Elasticsearch:
    +
    +1. An embedded Node
    +2. The TransportClient
    +
    +See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    +for information about the differences between the two modes.
    +
    +This code shows how to create a sink that uses an embedded Node for
    +communication:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = Maps.newHashMap();
    +// This instructs the sink to emit after every element, otherwise they would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +config.put("cluster.name", "my-cluster-name");
    +
    +input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
    +    @Override
    +    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
    --- End diff --
    
    The `ElasticsearchSink` is rich already. The `IndexRequestBuilder` is more like a souped up key selector that gives the user the power to specify in great detail how they want their element added to Elasticsearch. I admit the function signature is a bit strange but I didn't want to go full-blown RichFunction for the `IndexRequestBuilder`.
    
    Should we change it? Because then users would also think that they could make it stateful and all the other things that come with rich functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1040#issuecomment-133515409
  
    I'm happy to report that I managed to set up a distributed Elasticsearch cluster and the sink did work with it :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1040#issuecomment-133718276
  
    This looks pretty nice :-) Docs, tests, good comments, cluster tests!
    
    +1 to merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1040#discussion_r37697064
  
    --- Diff: docs/apis/streaming_guide.md ---
    @@ -1661,6 +1674,165 @@ More about Kafka can be found [here](https://kafka.apache.org/documentation.html
     
     [Back to top](#top)
     
    +### Elasticsearch
    +
    +This connector provides a Sink that can write to an
    +[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
    +following dependency to your project:
    +
    +{% highlight xml %}
    +<dependency>
    +  <groupId>org.apache.flink</groupId>
    +  <artifactId>flink-connector-elasticsearch</artifactId>
    +  <version>{{site.version }}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Note that the streaming connectors are currently not part of the binary
    +distribution. See
    +[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
    +for information about how to package the program with the libraries for
    +cluster execution.
    +
    +#### Installing Elasticsearch
    +
    +Instructions for setting up an Elasticsearch cluster can be found
    +[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
    +Make sure to set and remember a cluster name. This must be set when
    +creating a Sink for writing to your cluster
    +
    +#### Elasticsearch Sink
    +The connector provides a Sink that can send data to an Elasticsearch Index.
    +
    +The sink can use two different methods for communicating with Elasticsearch:
    +
    +1. An embedded Node
    +2. The TransportClient
    +
    +See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
    +for information about the differences between the two modes.
    +
    +This code shows how to create a sink that uses an embedded Node for
    +communication:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +Map<String, String> config = Maps.newHashMap();
    +// This instructs the sink to emit after every element, otherwise they would be buffered
    +config.put("bulk.flush.max.actions", "1");
    +config.put("cluster.name", "my-cluster-name");
    +
    +input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
    +    @Override
    +    public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
    --- End diff --
    
    Can we make the ElasticsearchSink rich? Then it has access to the RuntimeContext anyways, and it needs not be passed in every call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1040#discussion_r37697987
  
    --- Diff: docs/apis/streaming_guide.md ---
    @@ -1517,11 +1517,24 @@ Stream connectors
     ----------------
     
     <!-- TODO: reintroduce flume -->
    -Connectors provide an interface for accessing data from various third party sources (message queues). Currently three connectors are natively supported, namely [Apache Kafka](https://kafka.apache.org/),  [RabbitMQ](http://www.rabbitmq.com/) and the [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis).
    +Connectors provide code for interfacing with various third-party systems.
    +Currently these systems are supported:
     
    -Typically the connector packages consist of a source and sink class (with the exception of Twitter where only a source is provided). To use these sources the user needs to pass Serialization/Deserialization schemas for the connectors for the desired types. (Or use some predefined ones)
    + * [Apache Kafka](https://kafka.apache.org/)
    --- End diff --
    
    The source/sink would be helpful. The documentation might need a restructure anyways. The information about sources/sinks is scattered across the "Connecting to the Outside World" and "Connectors" sections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2558] Add Streaming Connector for Elast...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1040#issuecomment-134197297
  
    I updated the doc, if no-one objects I would like to merge this tonight.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---