You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 11:32:14 UTC

[flink] branch release-1.11 updated: [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document id

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 50f2fb9  [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document id
50f2fb9 is described below

commit 50f2fb9fbcfc162f5933a9f06519742de020001e
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue Jul 14 19:31:05 2020 +0800

    [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document id
    
    This closes #12894
---
 .../table/Elasticsearch6DynamicSink.java           |  2 +-
 .../table/Elasticsearch6DynamicSinkITCase.java     | 50 ++++++++++++++++------
 .../table/Elasticsearch7DynamicSinkITCase.java     | 50 ++++++++++++++++------
 3 files changed, 73 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index b7fd44d..33ddfef 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -220,7 +220,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 				String key,
 				XContentType contentType,
 				byte[] document) {
-			return new IndexRequest(index, docType, index)
+			return new IndexRequest(index, docType, key)
 				.source(document, contentType);
 		}
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index b306b34..8a7c180 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -208,7 +209,15 @@ public class Elasticsearch6DynamicSinkITCase {
 				12.12f,
 				(byte) 2,
 				LocalDate.ofEpochDay(12345),
-				LocalDateTime.parse("2012-12-12T12:12:12"))
+				LocalDateTime.parse("2012-12-12T12:12:12")),
+			row(
+				2L,
+				LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+				"FGHIJK",
+				13.13f,
+				(byte) 4,
+				LocalDate.ofEpochDay(12345),
+				LocalDateTime.parse("2013-12-12T13:13:13"))
 		).executeInsert("esTable")
 			.getJobClient()
 			.get()
@@ -226,25 +235,38 @@ public class Elasticsearch6DynamicSinkITCase {
 				.execute()
 				.actionGet()
 				.getHits();
-			if (hits.getTotalHits() < 1) {
+			if (hits.getTotalHits() < 2) {
 				Thread.sleep(200);
 			}
-		} while (hits.getTotalHits() < 1 && deadline.hasTimeLeft());
+		} while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
 
-		if (hits.getTotalHits() < 1) {
+		if (hits.getTotalHits() < 2) {
 			throw new AssertionError("Could not retrieve results from Elasticsearch.");
 		}
 
-		Map<String, Object> result = hits.getAt(0).getSourceAsMap();
-		Map<Object, Object> expectedMap = new HashMap<>();
-		expectedMap.put("a", 1);
-		expectedMap.put("b", "00:00:12");
-		expectedMap.put("c", "ABCDE");
-		expectedMap.put("d", 12.12d);
-		expectedMap.put("e", 2);
-		expectedMap.put("f", "2003-10-20");
-		expectedMap.put("g", "2012-12-12 12:12:12");
-		assertThat(result, equalTo(expectedMap));
+		HashSet<Map<String, Object>> resultSet = new HashSet<>();
+		resultSet.add(hits.getAt(0).getSourceAsMap());
+		resultSet.add(hits.getAt(1).getSourceAsMap());
+		Map<Object, Object> expectedMap1 = new HashMap<>();
+		expectedMap1.put("a", 1);
+		expectedMap1.put("b", "00:00:12");
+		expectedMap1.put("c", "ABCDE");
+		expectedMap1.put("d", 12.12d);
+		expectedMap1.put("e", 2);
+		expectedMap1.put("f", "2003-10-20");
+		expectedMap1.put("g", "2012-12-12 12:12:12");
+		Map<Object, Object> expectedMap2 = new HashMap<>();
+		expectedMap2.put("a", 2);
+		expectedMap2.put("b", "00:00:12");
+		expectedMap2.put("c", "FGHIJK");
+		expectedMap2.put("d", 13.13d);
+		expectedMap2.put("e", 4);
+		expectedMap2.put("f", "2003-10-20");
+		expectedMap2.put("g", "2013-12-12 13:13:13");
+		HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+		expectedSet.add(expectedMap1);
+		expectedSet.add(expectedMap2);
+		assertThat(resultSet, equalTo(expectedSet));
 	}
 
 	@Test
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 483a783..b011e5b 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -200,7 +201,15 @@ public class Elasticsearch7DynamicSinkITCase {
 				12.12f,
 				(byte) 2,
 				LocalDate.ofEpochDay(12345),
-				LocalDateTime.parse("2012-12-12T12:12:12"))
+				LocalDateTime.parse("2012-12-12T12:12:12")),
+			row(
+				2L,
+				LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+				"FGHIJK",
+				13.13f,
+				(byte) 4,
+				LocalDate.ofEpochDay(12345),
+				LocalDateTime.parse("2013-12-12T13:13:13"))
 		).executeInsert("esTable")
 			.getJobClient()
 			.get()
@@ -218,25 +227,38 @@ public class Elasticsearch7DynamicSinkITCase {
 				.execute()
 				.actionGet()
 				.getHits();
-			if (hits.getTotalHits().value < 1) {
+			if (hits.getTotalHits().value < 2) {
 				Thread.sleep(200);
 			}
-		} while (hits.getTotalHits().value < 1 && deadline.hasTimeLeft());
+		} while (hits.getTotalHits().value < 2 && deadline.hasTimeLeft());
 
-		if (hits.getTotalHits().value < 1) {
+		if (hits.getTotalHits().value < 2) {
 			throw new AssertionError("Could not retrieve results from Elasticsearch.");
 		}
 
-		Map<String, Object> result = hits.getAt(0).getSourceAsMap();
-		Map<Object, Object> expectedMap = new HashMap<>();
-		expectedMap.put("a", 1);
-		expectedMap.put("b", "00:00:12");
-		expectedMap.put("c", "ABCDE");
-		expectedMap.put("d", 12.12d);
-		expectedMap.put("e", 2);
-		expectedMap.put("f", "2003-10-20");
-		expectedMap.put("g", "2012-12-12 12:12:12");
-		assertThat(result, equalTo(expectedMap));
+		HashSet<Map<String, Object>> resultSet = new HashSet<>();
+		resultSet.add(hits.getAt(0).getSourceAsMap());
+		resultSet.add(hits.getAt(1).getSourceAsMap());
+		Map<Object, Object> expectedMap1 = new HashMap<>();
+		expectedMap1.put("a", 1);
+		expectedMap1.put("b", "00:00:12");
+		expectedMap1.put("c", "ABCDE");
+		expectedMap1.put("d", 12.12d);
+		expectedMap1.put("e", 2);
+		expectedMap1.put("f", "2003-10-20");
+		expectedMap1.put("g", "2012-12-12 12:12:12");
+		Map<Object, Object> expectedMap2 = new HashMap<>();
+		expectedMap2.put("a", 2);
+		expectedMap2.put("b", "00:00:12");
+		expectedMap2.put("c", "FGHIJK");
+		expectedMap2.put("d", 13.13d);
+		expectedMap2.put("e", 4);
+		expectedMap2.put("f", "2003-10-20");
+		expectedMap2.put("g", "2013-12-12 13:13:13");
+		HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+		expectedSet.add(expectedMap1);
+		expectedSet.add(expectedMap2);
+		assertThat(resultSet, equalTo(expectedSet));
 	}
 
 	@Test