You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 02:45:28 UTC

[flink] branch master updated: [FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new elasticsearch table sink

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2738307  [FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new elasticsearch table sink
2738307 is described below

commit 273830730c9524278576255c56c17c195f7c6bf9
Author: Leonard Xu <xb...@163.com>
AuthorDate: Tue Jul 14 10:45:02 2020 +0800

    [FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new elasticsearch table sink
    
    This closes #12886
---
 .../elasticsearch/table/IndexGeneratorFactory.java |  2 +-
 .../table/RowElasticsearchSinkFunction.java        |  5 +++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 37 ++++++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkITCase.java     | 37 ++++++++++++++++++++++
 4 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
index e60be72..692b1fe 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -55,7 +55,7 @@ import java.util.regex.Pattern;
  * convert a field value of TIMESTAMP/DATE/TIME type into the format specified by date_format_string. The
  * date_format_string is compatible with {@link java.text.SimpleDateFormat}. For example, if the option
  * value is 'myusers_{log_ts|yyyy-MM-dd}', then a record with log_ts field value 2020-03-27 12:25:55 will
- * be written into "myusers-2020-03-27" index.
+ * be written into "myusers_2020-03-27" index.
  */
 @Internal
 final class IndexGeneratorFactory {
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
index 4eaba48..76701d8 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -69,6 +69,11 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData>
 	}
 
 	@Override
+	public void open() {
+		indexGenerator.open();
+	}
+
+	@Override
 	public void process(
 			RowData element,
 			RuntimeContext ctx,
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 26cf90a..b306b34 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -247,6 +247,43 @@ public class Elasticsearch6DynamicSinkITCase {
 		assertThat(result, equalTo(expectedMap));
 	}
 
+	@Test
+	public void testWritingDocumentsWithDynamicIndex() throws Exception {
+		TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build());
+
+		String index = "dynamic-index-{b|yyyy-MM-dd}";
+		String myType = "MyType";
+		tableEnvironment.executeSql("CREATE TABLE esTable (" +
+			"a BIGINT NOT NULL,\n" +
+			"b TIMESTAMP NOT NULL,\n" +
+			"PRIMARY KEY (a) NOT ENFORCED\n" +
+			")\n" +
+			"WITH (\n" +
+			String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+			String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+			")");
+
+		tableEnvironment.fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+			.executeInsert("esTable")
+			.getJobClient()
+			.get()
+			.getJobExecutionResult(this.getClass().getClassLoader())
+			.get();
+
+		Client client = elasticsearchResource.getClient();
+		Map<String, Object> response = client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1")).actionGet().getSource();
+		Map<Object, Object> expectedMap = new HashMap<>();
+		expectedMap.put("a", 1);
+		expectedMap.put("b", "2012-12-12 12:12:12");
+		assertThat(response, equalTo(expectedMap));
+	}
+
 	private static class MockContext implements DynamicTableSink.Context {
 		@Override
 		public boolean isBounded() {
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 7f41eb7..483a783 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -239,6 +239,43 @@ public class Elasticsearch7DynamicSinkITCase {
 		assertThat(result, equalTo(expectedMap));
 	}
 
+	@Test
+	public void testWritingDocumentsWithDynamicIndex() throws Exception {
+		TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build());
+
+		String index = "dynamic-index-{b|yyyy-MM-dd}";
+		tableEnvironment.executeSql("CREATE TABLE esTable (" +
+			"a BIGINT NOT NULL,\n" +
+			"b TIMESTAMP NOT NULL,\n" +
+			"PRIMARY KEY (a) NOT ENFORCED\n" +
+			")\n" +
+			"WITH (\n" +
+			String.format("'%s'='%s',\n", "connector", "elasticsearch-7") +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+			String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+			String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+			")");
+
+		tableEnvironment.fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+			.executeInsert("esTable")
+			.getJobClient()
+			.get()
+			.getJobExecutionResult(this.getClass().getClassLoader())
+			.get();
+
+		Client client = elasticsearchResource.getClient();
+		Map<String, Object> response = client.get(new GetRequest("dynamic-index-2012-12-12", "1"))
+			.actionGet()
+			.getSource();
+		Map<Object, Object> expectedMap = new HashMap<>();
+		expectedMap.put("a", 1);
+		expectedMap.put("b", "2012-12-12 12:12:12");
+		assertThat(response, equalTo(expectedMap));
+	}
+
 	private static class MockContext implements DynamicTableSink.Context {
 		@Override
 		public boolean isBounded() {