You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 11:32:14 UTC
[flink] branch release-1.11 updated: [FLINK-18583][elasticsearch]
Fix Elasticsearch6 sink uses index as document id
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 50f2fb9 [FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document id
50f2fb9 is described below
commit 50f2fb9fbcfc162f5933a9f06519742de020001e
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Tue Jul 14 19:31:05 2020 +0800
[FLINK-18583][elasticsearch] Fix Elasticsearch6 sink uses index as document id
This closes #12894
---
.../table/Elasticsearch6DynamicSink.java | 2 +-
.../table/Elasticsearch6DynamicSinkITCase.java | 50 ++++++++++++++++------
.../table/Elasticsearch7DynamicSinkITCase.java | 50 ++++++++++++++++------
3 files changed, 73 insertions(+), 29 deletions(-)
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index b7fd44d..33ddfef 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -220,7 +220,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
String key,
XContentType contentType,
byte[] document) {
- return new IndexRequest(index, docType, index)
+ return new IndexRequest(index, docType, key)
.source(document, contentType);
}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index b306b34..8a7c180 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -208,7 +209,15 @@ public class Elasticsearch6DynamicSinkITCase {
12.12f,
(byte) 2,
LocalDate.ofEpochDay(12345),
- LocalDateTime.parse("2012-12-12T12:12:12"))
+ LocalDateTime.parse("2012-12-12T12:12:12")),
+ row(
+ 2L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "FGHIJK",
+ 13.13f,
+ (byte) 4,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2013-12-12T13:13:13"))
).executeInsert("esTable")
.getJobClient()
.get()
@@ -226,25 +235,38 @@ public class Elasticsearch6DynamicSinkITCase {
.execute()
.actionGet()
.getHits();
- if (hits.getTotalHits() < 1) {
+ if (hits.getTotalHits() < 2) {
Thread.sleep(200);
}
- } while (hits.getTotalHits() < 1 && deadline.hasTimeLeft());
+ } while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
- if (hits.getTotalHits() < 1) {
+ if (hits.getTotalHits() < 2) {
throw new AssertionError("Could not retrieve results from Elasticsearch.");
}
- Map<String, Object> result = hits.getAt(0).getSourceAsMap();
- Map<Object, Object> expectedMap = new HashMap<>();
- expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12");
- expectedMap.put("c", "ABCDE");
- expectedMap.put("d", 12.12d);
- expectedMap.put("e", 2);
- expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12 12:12:12");
- assertThat(result, equalTo(expectedMap));
+ HashSet<Map<String, Object>> resultSet = new HashSet<>();
+ resultSet.add(hits.getAt(0).getSourceAsMap());
+ resultSet.add(hits.getAt(1).getSourceAsMap());
+ Map<Object, Object> expectedMap1 = new HashMap<>();
+ expectedMap1.put("a", 1);
+ expectedMap1.put("b", "00:00:12");
+ expectedMap1.put("c", "ABCDE");
+ expectedMap1.put("d", 12.12d);
+ expectedMap1.put("e", 2);
+ expectedMap1.put("f", "2003-10-20");
+ expectedMap1.put("g", "2012-12-12 12:12:12");
+ Map<Object, Object> expectedMap2 = new HashMap<>();
+ expectedMap2.put("a", 2);
+ expectedMap2.put("b", "00:00:12");
+ expectedMap2.put("c", "FGHIJK");
+ expectedMap2.put("d", 13.13d);
+ expectedMap2.put("e", 4);
+ expectedMap2.put("f", "2003-10-20");
+ expectedMap2.put("g", "2013-12-12 13:13:13");
+ HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+ expectedSet.add(expectedMap1);
+ expectedSet.add(expectedMap2);
+ assertThat(resultSet, equalTo(expectedSet));
}
@Test
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 483a783..b011e5b 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -47,6 +47,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
@@ -200,7 +201,15 @@ public class Elasticsearch7DynamicSinkITCase {
12.12f,
(byte) 2,
LocalDate.ofEpochDay(12345),
- LocalDateTime.parse("2012-12-12T12:12:12"))
+ LocalDateTime.parse("2012-12-12T12:12:12")),
+ row(
+ 2L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "FGHIJK",
+ 13.13f,
+ (byte) 4,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2013-12-12T13:13:13"))
).executeInsert("esTable")
.getJobClient()
.get()
@@ -218,25 +227,38 @@ public class Elasticsearch7DynamicSinkITCase {
.execute()
.actionGet()
.getHits();
- if (hits.getTotalHits().value < 1) {
+ if (hits.getTotalHits().value < 2) {
Thread.sleep(200);
}
- } while (hits.getTotalHits().value < 1 && deadline.hasTimeLeft());
+ } while (hits.getTotalHits().value < 2 && deadline.hasTimeLeft());
- if (hits.getTotalHits().value < 1) {
+ if (hits.getTotalHits().value < 2) {
throw new AssertionError("Could not retrieve results from Elasticsearch.");
}
- Map<String, Object> result = hits.getAt(0).getSourceAsMap();
- Map<Object, Object> expectedMap = new HashMap<>();
- expectedMap.put("a", 1);
- expectedMap.put("b", "00:00:12");
- expectedMap.put("c", "ABCDE");
- expectedMap.put("d", 12.12d);
- expectedMap.put("e", 2);
- expectedMap.put("f", "2003-10-20");
- expectedMap.put("g", "2012-12-12 12:12:12");
- assertThat(result, equalTo(expectedMap));
+ HashSet<Map<String, Object>> resultSet = new HashSet<>();
+ resultSet.add(hits.getAt(0).getSourceAsMap());
+ resultSet.add(hits.getAt(1).getSourceAsMap());
+ Map<Object, Object> expectedMap1 = new HashMap<>();
+ expectedMap1.put("a", 1);
+ expectedMap1.put("b", "00:00:12");
+ expectedMap1.put("c", "ABCDE");
+ expectedMap1.put("d", 12.12d);
+ expectedMap1.put("e", 2);
+ expectedMap1.put("f", "2003-10-20");
+ expectedMap1.put("g", "2012-12-12 12:12:12");
+ Map<Object, Object> expectedMap2 = new HashMap<>();
+ expectedMap2.put("a", 2);
+ expectedMap2.put("b", "00:00:12");
+ expectedMap2.put("c", "FGHIJK");
+ expectedMap2.put("d", 13.13d);
+ expectedMap2.put("e", 4);
+ expectedMap2.put("f", "2003-10-20");
+ expectedMap2.put("g", "2013-12-12 13:13:13");
+ HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+ expectedSet.add(expectedMap1);
+ expectedSet.add(expectedMap2);
+ assertThat(resultSet, equalTo(expectedSet));
}
@Test