You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/07 14:47:57 UTC
[2/4] flink git commit: [FLINK-4988] [elasticsearch] Restructure
Elasticsearch connectors
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
new file mode 100644
index 0000000..098afa9
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
+ */
+public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge {
+
+ private static final long serialVersionUID = -2632363720584123682L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch1ApiCallBridge.class);
+
+ /** User-provided transport addresses. This is null if we are using an embedded {@link Node} for communication. */
+ private final List<TransportAddress> transportAddresses;
+
+ /** The embedded {@link Node} used for communication. This is null if we are using a TransportClient. */
+ private transient Node node;
+
+ /**
+ * Constructor for use of an embedded {@link Node} for communication with the Elasticsearch cluster.
+ */
+ Elasticsearch1ApiCallBridge() {
+ this.transportAddresses = null;
+ }
+
+ /**
+ * Constructor for use of a {@link TransportClient} for communication with the Elasticsearch cluster.
+ */
+ Elasticsearch1ApiCallBridge(List<TransportAddress> transportAddresses) {
+ Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+ this.transportAddresses = transportAddresses;
+ }
+
+ @Override
+ public Client createClient(Map<String, String> clientConfig) {
+ if (transportAddresses == null) {
+
+ // Make sure that we disable http access to our embedded node
+ Settings settings = settingsBuilder()
+ .put(clientConfig)
+ .put("http.enabled", false)
+ .build();
+
+ node = nodeBuilder()
+ .settings(settings)
+ .client(true)
+ .data(false)
+ .node();
+
+ Client client = node.client();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch client from embedded node");
+ }
+
+ return client;
+ } else {
+ Settings settings = settingsBuilder()
+ .put(clientConfig)
+ .build();
+
+ TransportClient transportClient = new TransportClient(settings);
+ for (TransportAddress transport: transportAddresses) {
+ transportClient.addTransportAddress(transport);
+ }
+
+ // verify that we actually are connected to a cluster
+ if (transportClient.connectedNodes().isEmpty()) {
+ throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+ }
+
+ return transportClient;
+ }
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return new RuntimeException(bulkItemResponse.getFailureMessage());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if (node != null && !node.isClosed()) {
+ node.close();
+ node = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
index ac14ade..c338860 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -17,59 +17,38 @@
package org.apache.flink.streaming.connectors.elasticsearch;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.collect.ImmutableList;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
/**
- * Sink that emits its input elements to an Elasticsearch cluster.
+ * Elasticsearch 1.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
*
* <p>
- * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
- * the sink will create a local {@link Node} for communicating with the
- * Elasticsearch cluster. When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, IndexRequestBuilder)} a {@link TransportClient} will
- * be used instead.
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, ElasticsearchSinkFunction)}
+ * the sink will create a local {@link Node} for communicating with the Elasticsearch cluster. When using the second
+ * constructor {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a
+ * {@link TransportClient} will be used instead.
*
* <p>
* <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
+ * can be connected to. When using the local {@code Node} for communicating, the sink will block and wait for a cluster
* to come online.
*
* <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
+ * The {@link Map} passed to the constructor is used to create the {@link Node} or {@link TransportClient}. The config
+ * keys can be found in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is
+ * {@code cluster.name}, which should be set to the name of the cluster that the sink should emit to.
*
* <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
* This will buffer elements before sending a request to the cluster. The behaviour of the
* {@code BulkProcessor} can be configured using these config keys:
* <ul>
@@ -80,236 +59,63 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
* </ul>
*
* <p>
- * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
*
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
*/
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
- public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
- /**
- * The user specified config map that we forward to Elasticsearch when we create the Client.
- */
- private final Map<String, String> userConfig;
-
- /**
- * The list of nodes that the TransportClient should connect to. This is null if we are using
- * an embedded Node to get a Client.
- */
- private final List<TransportAddress> transportNodes;
-
- /**
- * The builder that is used to construct an {@link IndexRequest} from the incoming element.
- */
- private final IndexRequestBuilder<T> indexRequestBuilder;
-
- /**
- * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
- * if we are using a TransportClient.
- */
- private transient Node node;
-
/**
- * The Client that was either retrieved from a Node or is a TransportClient.
- */
- private transient Client client;
-
- /**
- * Bulk processor that was created using the client
- */
- private transient BulkProcessor bulkProcessor;
-
- /**
- * This is set from inside the BulkProcessor listener if there where failures in processing.
- */
- private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
- /**
- * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
- */
- private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
- /**
- * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
*
- * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
+ * @param userConfig The map of user settings that are used when constructing the {@link Node} and {@link BulkProcessor}
* @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+ *
+ * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+ * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, ElasticsearchSinkFunction)} instead.
*/
+ @Deprecated
public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
- this.userConfig = userConfig;
- this.indexRequestBuilder = indexRequestBuilder;
- transportNodes = null;
+ this(userConfig, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
}
/**
- * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
- * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
- * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
- * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param indexRequestBuilder This is used to generate a {@link IndexRequest} from the incoming element
*
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, ElasticsearchSinkFunction)} instead.
*/
- public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
- this.userConfig = userConfig;
- this.indexRequestBuilder = indexRequestBuilder;
- this.transportNodes = transportNodes;
+ @Deprecated
+ public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, IndexRequestBuilder<T> indexRequestBuilder) {
+ this(userConfig, transportAddresses, new IndexRequestBuilderWrapperFunction<>(indexRequestBuilder));
}
/**
- * Initializes the connection to Elasticsearch by either creating an embedded
- * {@link org.elasticsearch.node.Node} and retrieving the
- * {@link org.elasticsearch.client.Client} from it or by creating a
- * {@link org.elasticsearch.client.transport.TransportClient}.
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using an embedded {@link Node}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the embedded {@link Node} and {@link BulkProcessor}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
- @Override
- public void open(Configuration configuration) {
- if (transportNodes == null) {
- // Make sure that we disable http access to our embedded node
- Settings settings =
- ImmutableSettings.settingsBuilder()
- .put(userConfig)
- .put("http.enabled", false)
- .build();
-
- node =
- nodeBuilder()
- .settings(settings)
- .client(true)
- .data(false)
- .node();
-
- client = node.client();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch Client {} from embedded Node", client);
- }
-
- } else {
- Settings settings = ImmutableSettings.settingsBuilder()
- .put(userConfig)
- .build();
-
- TransportClient transportClient = new TransportClient(settings);
- for (TransportAddress transport: transportNodes) {
- transportClient.addTransportAddress(transport);
- }
-
- // verify that we actually are connected to a cluster
- ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
- if (nodes.isEmpty()) {
- throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connected to nodes: " + nodes.toString());
- }
- }
-
- client = transportClient;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch TransportClient {}", client);
- }
- }
-
- BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
- client,
- new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId,
- BulkRequest request) {
-
- }
-
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- BulkResponse response) {
- if (response.hasFailures()) {
- for (BulkItemResponse itemResp : response.getItems()) {
- if (itemResp.isFailed()) {
- LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
- failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
- }
- }
- hasFailure.set(true);
- }
- }
-
- @Override
- public void afterBulk(long executionId,
- BulkRequest request,
- Throwable failure) {
- LOG.error(failure.getMessage());
- failureThrowable.compareAndSet(null, failure);
- hasFailure.set(true);
- }
- });
-
- // This makes flush() blocking
- bulkProcessorBuilder.setConcurrentRequests(0);
-
- ParameterTool params = ParameterTool.fromMap(userConfig);
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
- bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
- bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
- CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
- }
-
- bulkProcessor = bulkProcessorBuilder.build();
- }
-
- @Override
- public void invoke(T element) {
- IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Emitting IndexRequest: {}", indexRequest);
- }
-
- bulkProcessor.add(indexRequest);
+ public ElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ super(new Elasticsearch1ApiCallBridge(), userConfig, elasticsearchSinkFunction);
}
- @Override
- public void close() {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- bulkProcessor = null;
- }
-
- if (client != null) {
- client.close();
- }
-
- if (node != null) {
- node.close();
- }
-
- if (hasFailure.get()) {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
- } else {
- throw new RuntimeException("An error occured in ElasticsearchSink.");
-
- }
- }
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+ *
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
+ */
+ public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ super(new Elasticsearch1ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
index 04ae40a..18aa11e 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -51,7 +51,11 @@ import java.io.Serializable;
* }</pre>
*
* @param <T> The type of the element handled by this {@code IndexRequestBuilder}
+ *
+ * @deprecated Deprecated since version 1.2, to be removed at version 2.0.
+ * Please create a {@link ElasticsearchSink} using a {@link ElasticsearchSinkFunction} instead.
*/
+@Deprecated
public interface IndexRequestBuilder<T> extends Function, Serializable {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
new file mode 100644
index 0000000..6f1d138
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilderWrapperFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A dummy {@link ElasticsearchSinkFunction} that wraps a {@link IndexRequestBuilder}.
+ * This serves as a bridge for the usage deprecation of the {@code IndexRequestBuilder} interface.
+ */
+class IndexRequestBuilderWrapperFunction<T> implements ElasticsearchSinkFunction<T> {
+
+ private static final long serialVersionUID = 289876038414250101L;
+
+ private final IndexRequestBuilder<T> indexRequestBuilder;
+
+ IndexRequestBuilderWrapperFunction(IndexRequestBuilder<T> indexRequestBuilder) {
+ this.indexRequestBuilder = indexRequestBuilder;
+ }
+
+ @Override
+ public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(indexRequestBuilder.createIndexRequest(element, ctx));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
index 33a2e47..3a7b113 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,177 +18,149 @@
package org.apache.flink.streaming.connectors.elasticsearch;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.Node;
-import org.junit.Assert;
-import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.io.File;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
-public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
+ @Test
+ public void testTransportClient() throws Exception {
+ runTransportClientTest();
+ }
- private static final int NUM_ELEMENTS = 20;
+ @Test
+ public void testNullTransportClient() throws Exception {
+ runNullTransportClientTest();
+ }
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
+ @Test
+ public void testEmptyTransportClient() throws Exception {
+ runEmptyTransportClientTest();
+ }
@Test
- public void testNodeClient() throws Exception{
+ public void testTransportClientFails() throws Exception{
+ runTransportClientFailsTest();
+ }
- File dataDir = tempFolder.newFolder();
+ // -- Tests specific to Elasticsearch 1.x --
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
+ /**
+ * Tests that the Elasticsearch sink works properly using an embedded node to connect to Elasticsearch.
+ */
+ @Test
+ public void testEmbeddedNode() throws Exception {
+ final String index = "embedded-node-test-index";
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
- Map<String, String> config = Maps.newHashMap();
+ Map<String, String> userConfig = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ userConfig.put("cluster.name", CLUSTER_NAME);
+ userConfig.put("node.local", "true");
- source.addSink(new ElasticsearchSink<>(config, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))
+ );
+ env.execute("Elasticsearch Embedded Node Test");
// verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
+ Client client = embeddedNodeEnv.getClient();
+ SourceSinkDataTestKit.verifyProducedSinkData(client, index);
- node.close();
+ client.close();
}
+ /**
+ * Tests that behaviour of the deprecated {@link IndexRequestBuilder} constructor works properly.
+ */
@Test
- public void testTransportClient() throws Exception {
-
- File dataDir = tempFolder.newFolder();
+ public void testDeprecatedIndexRequestBuilderVariant() throws Exception {
+ final String index = "index-req-builder-test-index";
- Node node = nodeBuilder()
- .settings(ImmutableSettings.settingsBuilder()
- .put("http.enabled", false)
- .put("path.data", dataDir.getAbsolutePath()))
- // set a custom cluster name to verify that user config works correctly
- .clusterName("my-node-client-cluster")
- .local(true)
- .node();
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
-
- Map<String, String> config = Maps.newHashMap();
+ Map<String, String> userConfig = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
-
- // connect to our local node
- config.put("node.local", "true");
+ userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+ userConfig.put("cluster.name", CLUSTER_NAME);
+ userConfig.put("node.local", "true");
List<TransportAddress> transports = Lists.newArrayList();
transports.add(new LocalTransportAddress("1"));
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch TransportClient Test");
+ source.addSink(new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ new TestIndexRequestBuilder(index))
+ );
+ env.execute("Elasticsearch Deprecated IndexRequestBuilder Bridge Test");
// verify the results
- Client client = node.client();
- for (int i = 0; i < NUM_ELEMENTS; i++) {
- GetResponse response = client.get(new GetRequest("my-index",
- "my-type",
- Integer.toString(i))).actionGet();
- Assert.assertEquals("message #" + i, response.getSource().get("data"));
- }
+ Client client = embeddedNodeEnv.getClient();
+ SourceSinkDataTestKit.verifyProducedSinkData(client, index);
- node.close();
+ client.close();
}
- @Test(expected = JobExecutionException.class)
- public void testTransportClientFails() throws Exception{
- // this checks whether the TransportClient fails early when there is no cluster to
- // connect to. We don't hava such as test for the Node Client version since that
- // one will block and wait for a cluster to come online
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());
+ @Override
+ protected <T> ElasticsearchSinkBase<T> createElasticsearchSink(Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ return new ElasticsearchSink<>(userConfig, ElasticsearchUtils.convertInetSocketAddresses(transportAddresses), elasticsearchSinkFunction);
+ }
- Map<String, String> config = Maps.newHashMap();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
- config.put("cluster.name", "my-node-client-cluster");
+ @Override
+ protected <T> ElasticsearchSinkBase<T> createElasticsearchSinkForEmbeddedNode(
+ Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) throws Exception {
- // connect to our local node
- config.put("node.local", "true");
+ // Elasticsearch 1.x requires this setting when using
+ // LocalTransportAddress to connect to a local embedded node
+ userConfig.put("node.local", "true");
List<TransportAddress> transports = Lists.newArrayList();
transports.add(new LocalTransportAddress("1"));
- source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));
-
- env.execute("Elasticsearch Node Client Test");
+ return new ElasticsearchSink<>(
+ userConfig,
+ transports,
+ elasticsearchSinkFunction);
}
- private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+ /**
+ * A {@link IndexRequestBuilder} with equivalent functionality to {@link SourceSinkDataTestKit.TestElasticsearchSinkFunction}.
+ */
+ private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
- private volatile boolean running = true;
+ private final String index;
- @Override
- public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(Tuple2.of(i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
+ public TestIndexRequestBuilder(String index) {
+ this.index = index;
}
- }
-
- private static class TestIndexRequestBuilder implements IndexRequestBuilder<Tuple2<Integer, String>> {
- private static final long serialVersionUID = 1L;
@Override
public IndexRequest createIndexRequest(Tuple2<Integer, String> element, RuntimeContext ctx) {
@@ -196,10 +168,10 @@ public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase {
json.put("data", element.f1);
return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .id(element.f0.toString())
- .source(json);
+ .index(index)
+ .type("flink-es-test-type")
+ .id(element.f0.toString())
+ .source(json);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..a0c809b
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 1.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+ private Node node;
+
+ @Override
+ public void start(File tmpDataFolder, String clusterName) throws Exception {
+ if (node == null) {
+ node = nodeBuilder()
+ .settings(ImmutableSettings.settingsBuilder()
+ .put("http.enabled", false)
+ .put("path.data", tmpDataFolder.getAbsolutePath()))
+ .clusterName(clusterName)
+ .local(true)
+ .node();
+
+ node.start();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (node != null && !node.isClosed()) {
+ node.close();
+ node = null;
+ }
+ }
+
+ @Override
+ public Client getClient() {
+ if (node != null && !node.isClosed()) {
+ return node.client();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
deleted file mode 100644
index 136ae77..0000000
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
-import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Requests;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
- */
-public class ElasticsearchExample {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- for (int i = 0; i < 20 && running; i++) {
- ctx.collect("message #" + i);
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
-
- Map<String, String> config = new HashMap<>();
- // This instructs the sink to emit after every element, otherwise they would be buffered
- config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
-
- source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
- @Override
- public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
- Map<String, Object> json = new HashMap<>();
- json.put("data", element);
-
- return Requests.indexRequest()
- .index("my-index")
- .type("my-type")
- .source(json);
- }
- }));
-
-
- env.execute("Elasticsearch Example");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
new file mode 100644
index 0000000..d697c3c
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
+ */
+public class ElasticsearchSinkExample {
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> source = env.generateSequence(0, 20).map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", "elasticsearch");
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<TransportAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element));
+ }
+ }));
+
+
+ env.execute("Elasticsearch Sink Example");
+ }
+
+ private static IndexRequest createIndexRequest(String element) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .id(element)
+ .source(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
index dc20726..2055184 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/resources/log4j-test.properties
@@ -16,12 +16,12 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.target=System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/pom.xml b/flink-connectors/flink-connector-elasticsearch2/pom.xml
index 7aba36e..30396dd 100644
--- a/flink-connectors/flink-connector-elasticsearch2/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch2/pom.xml
@@ -52,17 +52,19 @@ under the License.
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+ <version>${project.version}</version>
</dependency>
+ <!-- Override Elasticsearch version in base from 1.x to 2.x -->
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
</dependency>
- <!-- core dependencies -->
+ <!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -78,6 +80,15 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_2.10</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
deleted file mode 100644
index 650931f..0000000
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.elasticsearch2;
-
-import org.elasticsearch.action.ActionRequest;
-import org.elasticsearch.action.bulk.BulkProcessor;
-
-public class BulkProcessorIndexer implements RequestIndexer {
- private final BulkProcessor bulkProcessor;
-
- public BulkProcessorIndexer(BulkProcessor bulkProcessor) {
- this.bulkProcessor = bulkProcessor;
- }
-
- @Override
- public void add(ActionRequest... actionRequests) {
- for (ActionRequest actionRequest : actionRequests) {
- this.bulkProcessor.add(actionRequest);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
new file mode 100644
index 0000000..9407d9f
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.flink.streaming.connectors.elasticsearch.util.ElasticsearchUtils;
+import org.apache.flink.util.Preconditions;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
+ */
+public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge {
+
+ private static final long serialVersionUID = 2638252694744361079L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch2ApiCallBridge.class);
+
+ /**
+ * User-provided transport addresses.
+ *
+ * We are using {@link InetSocketAddress} because {@link TransportAddress} is not serializable in Elasticsearch 2.x.
+ */
+ private final List<InetSocketAddress> transportAddresses;
+
+ Elasticsearch2ApiCallBridge(List<InetSocketAddress> transportAddresses) {
+ Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
+ this.transportAddresses = transportAddresses;
+ }
+
+ @Override
+ public Client createClient(Map<String, String> clientConfig) {
+ Settings settings = Settings.settingsBuilder().put(clientConfig).build();
+
+ TransportClient transportClient = TransportClient.builder().settings(settings).build();
+ for (TransportAddress address : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
+ transportClient.addTransportAddress(address);
+ }
+
+ // verify that we actually are connected to a cluster
+ if (transportClient.connectedNodes().isEmpty()) {
+ throw new RuntimeException("Elasticsearch client is not connected to any Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch TransportClient with connected nodes {}", transportClient.connectedNodes());
+ }
+
+ return transportClient;
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ // nothing to cleanup
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
index e839589..a0abc51 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
@@ -16,55 +16,30 @@
*/
package org.apache.flink.streaming.connectors.elasticsearch2;
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.Preconditions;
-import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.TimeValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
- * Sink that emits its input elements in bulk to an Elasticsearch cluster.
+ * Elasticsearch 2.x sink that requests multiple {@link ActionRequest ActionRequests}
+ * against a cluster for each incoming element.
*
* <p>
- * When using the second constructor
- * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will
- * be used.
+ * The sink internally uses a {@link TransportClient} to communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor.
*
* <p>
- * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
- * can be connected to.
+ * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found
+ * in the <a href="https://www.elastic.io">Elasticsearch documentation</a>. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should emit to.
*
* <p>
- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
- * {@link TransportClient}. The config keys can be found in the Elasticsearch
- * documentation. An important setting is {@code cluster.name}, this should be set to the name
- * of the cluster that the sink should emit to.
- *
- * <p>
- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}.
* This will buffer elements before sending a request to the cluster. The behaviour of the
* {@code BulkProcessor} can be configured using these config keys:
* <ul>
@@ -75,183 +50,41 @@ import java.util.concurrent.atomic.AtomicReference;
* </ul>
*
* <p>
- * You also have to provide an {@link RequestIndexer}. This is used to create an
- * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
- * {@link RequestIndexer} for an example.
+ * You also have to provide an {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction}.
+ * This is used to create multiple {@link ActionRequest ActionRequests} for each incoming element. See the class level
+ * documentation of {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} for an example.
*
- * @param <T> Type of the elements emitted by this sink
+ * @param <T> Type of the elements handled by this sink
*/
-public class ElasticsearchSink<T> extends RichSinkFunction<T> {
-
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
- public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
- public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
-
- /**
- * The user specified config map that we forward to Elasticsearch when we create the Client.
- */
- private final Map<String, String> userConfig;
-
- /**
- * The list of nodes that the TransportClient should connect to. This is null if we are using
- * an embedded Node to get a Client.
- */
- private final List<InetSocketAddress> transportAddresses;
-
- /**
- * The builder that is used to construct an {@link IndexRequest} from the incoming element.
- */
- private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
-
- /**
- * The Client that was either retrieved from a Node or is a TransportClient.
- */
- private transient Client client;
-
- /**
- * Bulk processor that was created using the client
- */
- private transient BulkProcessor bulkProcessor;
-
- /**
- * Bulk {@link org.elasticsearch.action.ActionRequest} indexer
- */
- private transient RequestIndexer requestIndexer;
-
- /**
- * This is set from inside the BulkProcessor listener if there where failures in processing.
- */
- private final AtomicBoolean hasFailure = new AtomicBoolean(false);
-
- /**
- * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
- */
- private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
-
/**
- * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
- * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
- * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient}
- * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element
+ * @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ * Please use {@link ElasticsearchSink#ElasticsearchSink(Map, List, org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction)} instead.
*/
+ @Deprecated
public ElasticsearchSink(Map<String, String> userConfig, List<InetSocketAddress> transportAddresses, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
- this.userConfig = userConfig;
- this.elasticsearchSinkFunction = elasticsearchSinkFunction;
- Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0);
- this.transportAddresses = transportAddresses;
+ this(userConfig, transportAddresses, new OldNewElasticsearchSinkFunctionBridge<>(elasticsearchSinkFunction));
}
/**
- * Initializes the connection to Elasticsearch by creating a
- * {@link org.elasticsearch.client.transport.TransportClient}.
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
+ *
+ * @param userConfig The map of user settings that are passed when constructing the {@link TransportClient} and {@link BulkProcessor}
+ * @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
- @Override
- public void open(Configuration configuration) {
- List<TransportAddress> transportNodes;
- transportNodes = new ArrayList<>(transportAddresses.size());
- for (InetSocketAddress address : transportAddresses) {
- transportNodes.add(new InetSocketTransportAddress(address));
- }
-
- Settings settings = Settings.settingsBuilder().put(userConfig).build();
-
- TransportClient transportClient = TransportClient.builder().settings(settings).build();
- for (TransportAddress transport: transportNodes) {
- transportClient.addTransportAddress(transport);
- }
-
- // verify that we actually are connected to a cluster
- ImmutableList<DiscoveryNode> nodes = ImmutableList.copyOf(transportClient.connectedNodes());
- if (nodes.isEmpty()) {
- throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
- }
-
- client = transportClient;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Created Elasticsearch TransportClient {}", client);
- }
-
- BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
-
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- if (response.hasFailures()) {
- for (BulkItemResponse itemResp : response.getItems()) {
- if (itemResp.isFailed()) {
- LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
- failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
- }
- }
- hasFailure.set(true);
- }
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error(failure.getMessage());
- failureThrowable.compareAndSet(null, failure);
- hasFailure.set(true);
- }
- });
-
- // This makes flush() blocking
- bulkProcessorBuilder.setConcurrentRequests(0);
-
- ParameterTool params = ParameterTool.fromMap(userConfig);
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
- bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
- bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
- CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
- }
-
- if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
- bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
- }
-
- bulkProcessor = bulkProcessorBuilder.build();
- requestIndexer = new BulkProcessorIndexer(bulkProcessor);
- }
-
- @Override
- public void invoke(T element) {
- elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer);
+ public ElasticsearchSink(Map<String, String> userConfig,
+ List<InetSocketAddress> transportAddresses,
+ org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ super(new Elasticsearch2ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction);
}
-
- @Override
- public void close() {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- bulkProcessor = null;
- }
-
- if (client != null) {
- client.close();
- }
-
- if (hasFailure.get()) {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
- } else {
- throw new RuntimeException("An error occured in ElasticsearchSink.");
- }
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
index 55ba720..c474390 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java
@@ -1,13 +1,12 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import java.io.Serializable;
/**
- * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream.
+ * Method that creates multiple {@link org.elasticsearch.action.ActionRequest}s from an element in a Stream.
*
* <p>
* This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch.
@@ -54,7 +53,12 @@ import java.io.Serializable;
* }</pre>
*
* @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ *
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ * This class has been deprecated due to package relocation.
+ * Please use {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} instead.
*/
+@Deprecated
public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
void process(T element, RuntimeContext ctx, RequestIndexer indexer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
new file mode 100644
index 0000000..c95fff5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewElasticsearchSinkFunctionBridge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction} to bridge
+ * the migration from the deprecated {@link ElasticsearchSinkFunction}.
+ */
+class OldNewElasticsearchSinkFunctionBridge<T> implements org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction<T> {
+
+ private static final long serialVersionUID = 2415651895272659448L;
+
+ private final ElasticsearchSinkFunction<T> deprecated;
+ private OldNewRequestIndexerBridge reusedRequestIndexerBridge;
+
+ OldNewElasticsearchSinkFunctionBridge(ElasticsearchSinkFunction<T> deprecated) {
+ this.deprecated = deprecated;
+ }
+
+ @Override
+ public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
+ if (reusedRequestIndexerBridge == null) {
+ reusedRequestIndexerBridge = new OldNewRequestIndexerBridge(indexer);
+ }
+ deprecated.process(element, ctx, reusedRequestIndexerBridge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
new file mode 100644
index 0000000..f42fb44
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/OldNewRequestIndexerBridge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch2;
+
+import org.elasticsearch.action.ActionRequest;
+
+/**
+ * A dummy {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} to bridge
+ * the migration from the deprecated {@link RequestIndexer}.
+ */
+class OldNewRequestIndexerBridge implements RequestIndexer {
+
+ private static final long serialVersionUID = 4213982619497149416L;
+
+ private final org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer;
+
+ OldNewRequestIndexerBridge(org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer requestIndexer) {
+ this.requestIndexer = requestIndexer;
+ }
+
+ @Override
+ public void add(ActionRequest... actionRequests) {
+ requestIndexer.add(actionRequests);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
index 144a87b..b2b3de4 100644
--- a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java
@@ -20,6 +20,12 @@ import org.elasticsearch.action.ActionRequest;
import java.io.Serializable;
+/**
+ * @deprecated Deprecated since 1.2, to be removed at 2.0.
+ * This class has been deprecated due to package relocation.
+ * Please use {@link org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer} instead.
+ */
+@Deprecated
public interface RequestIndexer extends Serializable {
void add(ActionRequest... actionRequests);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b5caaef8/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
new file mode 100644
index 0000000..ddf3bd6
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkITCase;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.io.File;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 2.x.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment {
+
+ private Node node;
+
+ @Override
+ public void start(File tmpDataFolder, String clusterName) throws Exception {
+ if (node == null) {
+ node = NodeBuilder.nodeBuilder().settings(
+ Settings.settingsBuilder()
+ .put("path.home", tmpDataFolder.getParent())
+ .put("http.enabled", false)
+ .put("path.data", tmpDataFolder.getAbsolutePath()))
+ .clusterName(clusterName)
+ .node();
+
+ node.start();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (node != null && !node.isClosed()) {
+ node.close();
+ node = null;
+ }
+ }
+
+ @Override
+ public Client getClient() {
+ if (node != null && !node.isClosed()) {
+ return node.client();
+ } else {
+ return null;
+ }
+ }
+
+}