You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/26 11:41:09 UTC

[flink] branch release-1.10 updated: [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 2e96e49  [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7
2e96e49 is described below

commit 2e96e49e8a1f4b467441c1cb7ace5fb1190a2d94
Author: Leonard Xu <xb...@163.com>
AuthorDate: Thu Mar 26 19:40:46 2020 +0800

    [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7
    
    We shouldn't `exclude org.elasticsearch:elasticsearch-geo` and `org.elasticsearch.plugin:lang-mustache-client` when shading.
    
    This closes #11508
---
 .../flink-sql-connector-elasticsearch7/pom.xml     | 14 +++--
 flink-connectors/pom.xml                           |  1 +
 .../flink-sql-client-test/pom.xml                  | 13 +++++
 flink-end-to-end-tests/run-nightly-tests.sh        |  7 ++-
 .../test-scripts/elasticsearch-common.sh           |  2 +-
 .../test-scripts/test_sql_client.sh                | 61 +++++++++++++++-------
 tools/travis/splits/split_misc.sh                  |  7 ++-
 7 files changed, 78 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
index 096f412..2237886 100644
--- a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
+++ b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
@@ -63,14 +63,10 @@ under the License.
 								</includes>
 								<excludes>
 									<!-- These dependencies are not required. -->
-									<exclude>com.carrotsearch:hppc</exclude>
 									<exclude>com.tdunning:t-digest</exclude>
 									<exclude>joda-time:joda-time</exclude>
 									<exclude>net.sf.jopt-simple:jopt-simple</exclude>
 									<exclude>org.elasticsearch:jna</exclude>
-									<exclude>org.elasticsearch:elasticsearch-geo</exclude>
-									<exclude>org.elasticsearch.plugin:lang-mustache-client</exclude>
-									<exclude>com.github.spullara.mustache.java:compiler</exclude>
 									<exclude>org.hdrhistogram:HdrHistogram</exclude>
 									<exclude>org.yaml:snakeyaml</exclude>
 								</excludes>
@@ -114,6 +110,8 @@ under the License.
 										<exclude>META-INF/services/org.apache.lucene.**</exclude>
 										<exclude>META-INF/services/org.elasticsearch.**</exclude>
 										<exclude>META-INF/LICENSE.txt</exclude>
+										<!-- exclude log4j properties file -->
+										<exclude>Log4j-charsets.properties</exclude>
 									</excludes>
 								</filter>
 							</filters>
@@ -143,6 +141,14 @@ under the License.
 									<pattern>com.fasterxml.jackson</pattern>
 									<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.fasterxml.jackson</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>com.carrotsearch.hppc</pattern>
+									<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.carrotsearch.hppc</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.github.mustachejava</pattern>
+									<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.github.mustachejava</shadedPattern>
+								</relocation>
 							</relocations>
 						</configuration>
 					</execution>
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 1e40061..9e4e29f 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -96,6 +96,7 @@ under the License.
 			</activation>
 			<modules>
 				<module>flink-sql-connector-elasticsearch6</module>
+				<module>flink-sql-connector-elasticsearch7</module>
 				<module>flink-sql-connector-kafka-0.9</module>
 				<module>flink-sql-connector-kafka-0.10</module>
 				<module>flink-sql-connector-kafka-0.11</module>
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index d0bc720..13a5b5f 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -103,6 +103,13 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
+		<dependency>
+			<!-- Used by maven-dependency-plugin -->
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 
 	<dependencyManagement>
@@ -206,6 +213,12 @@ under the License.
 									<version>${project.version}</version>
 									<type>jar</type>
 								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+								</artifactItem>
 							</artifactItems>
 						</configuration>
 					</execution>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 945c689..94a8fe2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -158,8 +158,11 @@ run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_D
 run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
 run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"
 
-run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
-run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"
+
 run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
 run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 0ef278a..8a2afcb 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -74,7 +74,7 @@ function verify_result_line_number {
     while : ; do
       curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output || true
 
-      if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
+      if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ] || [ -n "$(grep "\"value\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
           echo "Elasticsearch end to end test pass."
           break
       else
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 8eec6ea..5420924 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -20,12 +20,17 @@
 set -Eeuo pipefail
 
 PLANNER="${1:-old}"
+ELASTICSEARCH_VERSION=${2:-6}
 
 KAFKA_VERSION="2.2.0"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
 KAFKA_SQL_VERSION="universal"
 
+ELASTICSEARCH6_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
+ELASTICSEARCH7_MAC_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-darwin-x86_64.tar.gz'
+ELASTICSEARCH7_LINUX_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-linux-x86_64.tar.gz'
+
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/kafka_sql_common.sh \
   $KAFKA_VERSION \
@@ -85,6 +90,30 @@ function sql_cleanup() {
 }
 on_exit sql_cleanup
 
+function prepare_elasticsearch {
+  echo "Preparing Elasticsearch(version=$ELASTICSEARCH_VERSION)..."
+  # elastcisearch offers different release binary file for corresponding system since version 7.
+  case "$(uname -s)" in
+      Linux*)     OS_TYPE=linux;;
+      Darwin*)    OS_TYPE=mac;;
+      *)          OS_TYPE="UNKNOWN:${unameOut}"
+  esac
+
+  if [[ "$ELASTICSEARCH_VERSION" == 6 ]]; then
+    DOWNLOAD_URL=$ELASTICSEARCH6_DOWNLOAD_URL
+  elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "mac" ]]; then
+    DOWNLOAD_URL=$ELASTICSEARCH7_MAC_DOWNLOAD_URL
+  elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "linux" ]]; then
+    DOWNLOAD_URL=$ELASTICSEARCH7_LINUX_DOWNLOAD_URL
+  else
+    echo "[ERROR] Unsupported elasticsearch version($ELASTICSEARCH_VERSION) for OS: $OS_TYPE"
+    exit 1
+  fi
+
+  setup_elasticsearch $DOWNLOAD_URL
+  wait_elasticsearch_working
+}
+
 # prepare Kafka
 echo "Preparing Kafka..."
 
@@ -96,13 +125,7 @@ create_kafka_json_source test-json
 create_kafka_topic 1 1 test-avro
 
 # prepare Elasticsearch
-echo "Preparing Elasticsearch..."
-
-ELASTICSEARCH_VERSION=6
-DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
-
-setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION
-wait_elasticsearch_working
+prepare_elasticsearch
 
 ################################################################################
 # Prepare Flink
@@ -121,7 +144,7 @@ echo "Testing SQL statements..."
 
 JSON_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "json" )
 KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" )
-ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch6" )
+ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch$ELASTICSEARCH_VERSION" )
 
 # create session environment file
 RESULT=$TEST_DATA_DIR/result
@@ -146,7 +169,7 @@ cat >> $SQL_CONF << EOF
         data-type: BIGINT
     connector:
       type: elasticsearch
-      version: 6
+      version: "$ELASTICSEARCH_VERSION"
       hosts: "http://localhost:9200"
       index: "$ELASTICSEARCH_INDEX"
       document-type: "user"
@@ -167,7 +190,7 @@ cat >> $SQL_CONF << EOF
         data-type: BIGINT
     connector:
       type: elasticsearch
-      version: 6
+      version: "$ELASTICSEARCH_VERSION"
       hosts: "http://localhost:9200"
       index: "$ELASTICSEARCH_INDEX"
       document-type: "user"
@@ -190,7 +213,7 @@ EOF
 
 echo "Executing SQL: Values -> Elasticsearch (upsert)"
 
-SQL_STATEMENT_3=$(cat << EOF
+SQL_STATEMENT_1=$(cat << EOF
 INSERT INTO ElasticsearchUpsertSinkTable
   SELECT user_id, user_name, COUNT(*) AS user_count
   FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))
@@ -200,18 +223,20 @@ EOF
 )
 
 JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
-  --library $SQL_JARS_DIR \
+  --jar $KAFKA_SQL_JAR \
+  --jar $JSON_SQL_JAR \
+  --jar $ELASTICSEARCH_SQL_JAR \
   --jar $SQL_TOOLBOX_JAR \
   --environment $SQL_CONF \
-  --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
+  --update "$SQL_STATEMENT_1" | grep "Job ID:" | sed 's/.* //g')
 
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
-verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "21a76360e2a40f442816d940e7071ccf"
+verify_result_line_number 3 "$ELASTICSEARCH_INDEX"
 
 echo "Executing SQL: Values -> Elasticsearch (append, no key)"
 
-SQL_STATEMENT_4=$(cat << EOF
+SQL_STATEMENT_2=$(cat << EOF
 INSERT INTO ElasticsearchAppendSinkTable
   SELECT *
   FROM (
@@ -232,7 +257,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
   --jar $ELASTICSEARCH_SQL_JAR \
   --jar $SQL_TOOLBOX_JAR \
   --environment $SQL_CONF \
-  --update "$SQL_STATEMENT_4" | grep "Job ID:" | sed 's/.* //g')
+  --update "$SQL_STATEMENT_2" | grep "Job ID:" | sed 's/.* //g')
 
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
@@ -241,7 +266,7 @@ verify_result_line_number 9 "$ELASTICSEARCH_INDEX"
 
 echo "Executing SQL: Match recognize -> Elasticsearch"
 
-SQL_STATEMENT_5=$(cat << EOF
+SQL_STATEMENT_3=$(cat << EOF
 INSERT INTO ElasticsearchAppendSinkTable
   SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count
   FROM (
@@ -265,7 +290,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
   --jar $ELASTICSEARCH_SQL_JAR \
   --jar $SQL_TOOLBOX_JAR \
   --environment $SQL_CONF \
-  --update "$SQL_STATEMENT_5" | grep "Job ID:" | sed 's/.* //g')
+  --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
 
 # 3 upsert results and 6 append results and 3 match_recognize results
 verify_result_line_number 12 "$ELASTICSEARCH_INDEX"
diff --git a/tools/travis/splits/split_misc.sh b/tools/travis/splits/split_misc.sh
index a6d00c0..c9102b0 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -73,8 +73,11 @@ fi
 run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
 run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"
 
-run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
-run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"
+
 if [[ ${PROFILE} != *"jdk11"* ]]; then
   run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
   run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"