You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 02:45:28 UTC
[flink] branch master updated: [FLINK-18585][elasticsearch] Fix
dynamic index doesn't work in new elasticsearch table sink
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2738307 [FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new elasticsearch table sink
2738307 is described below
commit 273830730c9524278576255c56c17c195f7c6bf9
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Jul 14 10:45:02 2020 +0800
[FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new elasticsearch table sink
This closes #12886
---
.../elasticsearch/table/IndexGeneratorFactory.java | 2 +-
.../table/RowElasticsearchSinkFunction.java | 5 +++
.../table/Elasticsearch6DynamicSinkITCase.java | 37 ++++++++++++++++++++++
.../table/Elasticsearch7DynamicSinkITCase.java | 37 ++++++++++++++++++++++
4 files changed, 80 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
index e60be72..692b1fe 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -55,7 +55,7 @@ import java.util.regex.Pattern;
* convert a field value of TIMESTAMP/DATE/TIME type into the format specified by date_format_string. The
* date_format_string is compatible with {@link java.text.SimpleDateFormat}. For example, if the option
* value is 'myusers_{log_ts|yyyy-MM-dd}', then a record with log_ts field value 2020-03-27 12:25:55 will
- * be written into "myusers-2020-03-27" index.
+ * be written into "myusers_2020-03-27" index.
*/
@Internal
final class IndexGeneratorFactory {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
index 4eaba48..76701d8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -69,6 +69,11 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData>
}
@Override
+ public void open() {
+ indexGenerator.open();
+ }
+
+ @Override
public void process(
RowData element,
RuntimeContext ctx,
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 26cf90a..b306b34 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -247,6 +247,43 @@ public class Elasticsearch6DynamicSinkITCase {
assertThat(result, equalTo(expectedMap));
}
+ @Test
+ public void testWritingDocumentsWithDynamicIndex() throws Exception {
+ TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "dynamic-index-{b|yyyy-MM-dd}";
+ String myType = "MyType";
+ tableEnvironment.executeSql("CREATE TABLE esTable (" +
+ "a BIGINT NOT NULL,\n" +
+ "b TIMESTAMP NOT NULL,\n" +
+ "PRIMARY KEY (a) NOT ENFORCED\n" +
+ ")\n" +
+ "WITH (\n" +
+ String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+ String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+ ")");
+
+ tableEnvironment.fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(this.getClass().getClassLoader())
+ .get();
+
+ Client client = elasticsearchResource.getClient();
+ Map<String, Object> response = client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1")).actionGet().getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "2012-12-12 12:12:12");
+ assertThat(response, equalTo(expectedMap));
+ }
+
private static class MockContext implements DynamicTableSink.Context {
@Override
public boolean isBounded() {
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 7f41eb7..483a783 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -239,6 +239,43 @@ public class Elasticsearch7DynamicSinkITCase {
assertThat(result, equalTo(expectedMap));
}
+ @Test
+ public void testWritingDocumentsWithDynamicIndex() throws Exception {
+ TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "dynamic-index-{b|yyyy-MM-dd}";
+ tableEnvironment.executeSql("CREATE TABLE esTable (" +
+ "a BIGINT NOT NULL,\n" +
+ "b TIMESTAMP NOT NULL,\n" +
+ "PRIMARY KEY (a) NOT ENFORCED\n" +
+ ")\n" +
+ "WITH (\n" +
+ String.format("'%s'='%s',\n", "connector", "elasticsearch-7") +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+ String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+ ")");
+
+ tableEnvironment.fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(this.getClass().getClassLoader())
+ .get();
+
+ Client client = elasticsearchResource.getClient();
+ Map<String, Object> response = client.get(new GetRequest("dynamic-index-2012-12-12", "1"))
+ .actionGet()
+ .getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "2012-12-12 12:12:12");
+ assertThat(response, equalTo(expectedMap));
+ }
+
private static class MockContext implements DynamicTableSink.Context {
@Override
public boolean isBounded() {