You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/11 20:12:12 UTC

[2/4] flink git commit: [FLINK-7150] [elasticsearch connector] Various code cleanups in the ElasticSearch connector

[FLINK-7150] [elasticsearch connector] Various code cleanups in the ElasticSearch connector

 - Removes Serializable from the RequestIndexer, because they are neither required to be
   serializable (they are created in open()) nor is the main implementation
   (BulkProcessorIndexer) actually serializable.

 - Makes BulkFlushBackoffPolicy a static inner class, which avoids adding outer class during
   serialization and clears various warnings about raw reference to outer class

This closes #4298


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89fd6359
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89fd6359
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89fd6359

Branch: refs/heads/master
Commit: 89fd63599db7451af081040cdc05bd4dc840b44d
Parents: 4d7d847
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 11 13:01:25 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 11 20:13:37 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/elasticsearch/BulkProcessorIndexer.java | 2 --
 .../connectors/elasticsearch/ElasticsearchSinkBase.java          | 2 +-
 .../flink/streaming/connectors/elasticsearch/RequestIndexer.java | 4 +---
 .../elasticsearch/examples/ElasticsearchSinkExample.java         | 1 +
 4 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 838865a..3e290ff 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -31,8 +31,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 class BulkProcessorIndexer implements RequestIndexer {
 
-	private static final long serialVersionUID = 6841162943062034253L;
-
 	private final BulkProcessor bulkProcessor;
 	private final boolean flushOnCheckpoint;
 	private final AtomicLong numPendingRequestsRef;

http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 2ab5a90..c49d726 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -94,7 +94,7 @@ public abstract class ElasticsearchSinkBase<T> extends RichSinkFunction<T> imple
 	 *
 	 * <p>This is a proxy for version specific backoff policies.
 	 */
-	public class BulkFlushBackoffPolicy implements Serializable {
+	public static class BulkFlushBackoffPolicy implements Serializable {
 
 		private static final long serialVersionUID = -6022851996101826049L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 4587a80..cfa166e 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.connectors.elasticsearch;
 
 import org.elasticsearch.action.ActionRequest;
 
-import java.io.Serializable;
-
 /**
  * Users add multiple {@link ActionRequest ActionRequests} to a {@link RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
-public interface RequestIndexer extends Serializable {
+public interface RequestIndexer {
 
 	/**
 	 * Add multiple {@link ActionRequest} to the indexer to prepare for sending requests to Elasticsearch.

http://git-wip-us.apache.org/repos/asf/flink/blob/89fd6359/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
index f181032..8a0321d 100644
--- a/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
+++ b/flink-connectors/flink-connector-elasticsearch/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchSinkExample.java
@@ -40,6 +40,7 @@ import java.util.Map;
  * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
  * you have a cluster named "elasticsearch" running or change the cluster name in the config map.
  */
+@SuppressWarnings("serial")
 public class ElasticsearchSinkExample {
 
 	public static void main(String[] args) throws Exception {