You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/11 20:12:12 UTC
[2/4] flink git commit: [FLINK-7150] [elasticsearch connector]
Various code cleanups in the ElasticSearch connector
[FLINK-7150] [elasticsearch connector] Various code cleanups in the ElasticSearch connector
- Removes Serializable from the RequestIndexer, because they are neither required to be
serializable (they are created in open()) nor is the main implementation
(BulkProcessorIndexer) actually serializable.
- Makes BulkFlushBackoffPolicy a static inner class, which avoids adding outer class during
serialization and clears various warnings about raw reference to outer class
This closes #4298
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89fd6359
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89fd6359
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89fd6359
Branch: refs/heads/master
Commit: 89fd63599db7451af081040cdc05bd4dc840b44d
Parents: 4d7d847
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 11 13:01:25 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 11 20:13:37 2017 +0200
----------------------------------------------------------------------
.../streaming/connectors/elasticsearch/BulkProcessorIndexer.java | 2 --
.../connectors/elasticsearch/ElasticsearchSinkBase.java | 2 +-
.../flink/streaming/connectors/elasticsearch/RequestIndexer.java | 4 +---
.../elasticsearch/examples/ElasticsearchSinkExample.java | 1 +
4 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 838865a..3e290ff 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -31,8 +31,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
class BulkProcessorIndexer implements RequestIndexer {
- private static final long serialVersionUID = 6841162943062034253L;
-
private final BulkProcessor bulkProcessor;
private final boolean flushOnCheckpoint;
private final AtomicLong numPendingRequestsRef;
http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2ab5a90..c49d726 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -94,7 +94,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
*
* <p>This is a proxy for version specific backoff policies.
*/
- public class BulkFlushBackoffPolicy implements Serializable {
+ public static class BulkFlushBackoffPolicy implements Serializable {
private static final long serialVersionUID = -6022851996101826049L;
http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 4587a80..cfa166e 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.connectors.elasticsearch;
import org.elasticsearch.action.ActionRequest;
-import java.io.Serializable;
-
/**
* Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
* them for sending to an Elasticsearch cluster.
*/
-public interface RequestIndexer extends Serializable {
+public interface RequestIndexer {
/**
* Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.
http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
index f181032..8a0321d 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -40,6 +40,7 @@ import java.util.Map;
* This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
* you have a cluster named "elasticsearch" running or change the cluster name in the config map.
*/
+@SuppressWarnings("serial")
public class ElasticsearchSinkExample {
public static void main(String[] args) throws Exception {