You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/26 11:41:09 UTC
[flink] branch release-1.10 updated: [FLINK-16170][elasticsearch]
Fix SearchTemplateRequest ClassNotFoundException when using
flink-sql-connector-elasticsearch7
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 2e96e49 [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7
2e96e49 is described below
commit 2e96e49e8a1f4b467441c1cb7ace5fb1190a2d94
Author: Leonard Xu <xb...@163.com>
AuthorDate: Thu Mar 26 19:40:46 2020 +0800
[FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7
We shouldn't `exclude org.elasticsearch:elasticsearch-geo` and `org.elasticsearch.plugin:lang-mustache-client` when shading.
This closes #11508
---
.../flink-sql-connector-elasticsearch7/pom.xml | 14 +++--
flink-connectors/pom.xml | 1 +
.../flink-sql-client-test/pom.xml | 13 +++++
flink-end-to-end-tests/run-nightly-tests.sh | 7 ++-
.../test-scripts/elasticsearch-common.sh | 2 +-
.../test-scripts/test_sql_client.sh | 61 +++++++++++++++-------
tools/travis/splits/split_misc.sh | 7 ++-
7 files changed, 78 insertions(+), 27 deletions(-)
diff --git a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
index 096f412..2237886 100644
--- a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
+++ b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml
@@ -63,14 +63,10 @@ under the License.
</includes>
<excludes>
<!-- These dependencies are not required. -->
- <exclude>com.carrotsearch:hppc</exclude>
<exclude>com.tdunning:t-digest</exclude>
<exclude>joda-time:joda-time</exclude>
<exclude>net.sf.jopt-simple:jopt-simple</exclude>
<exclude>org.elasticsearch:jna</exclude>
- <exclude>org.elasticsearch:elasticsearch-geo</exclude>
- <exclude>org.elasticsearch.plugin:lang-mustache-client</exclude>
- <exclude>com.github.spullara.mustache.java:compiler</exclude>
<exclude>org.hdrhistogram:HdrHistogram</exclude>
<exclude>org.yaml:snakeyaml</exclude>
</excludes>
@@ -114,6 +110,8 @@ under the License.
<exclude>META-INF/services/org.apache.lucene.**</exclude>
<exclude>META-INF/services/org.elasticsearch.**</exclude>
<exclude>META-INF/LICENSE.txt</exclude>
+ <!-- exclude log4j properties file -->
+ <exclude>Log4j-charsets.properties</exclude>
</excludes>
</filter>
</filters>
@@ -143,6 +141,14 @@ under the License.
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>org.apache.flink.elasticsearch7.shaded.com.fasterxml.jackson</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.carrotsearch.hppc</pattern>
+ <shadedPattern>org.apache.flink.elasticsearch7.shaded.com.carrotsearch.hppc</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.github.mustachejava</pattern>
+ <shadedPattern>org.apache.flink.elasticsearch7.shaded.com.github.mustachejava</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 1e40061..9e4e29f 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -96,6 +96,7 @@ under the License.
</activation>
<modules>
<module>flink-sql-connector-elasticsearch6</module>
+ <module>flink-sql-connector-elasticsearch7</module>
<module>flink-sql-connector-kafka-0.9</module>
<module>flink-sql-connector-kafka-0.10</module>
<module>flink-sql-connector-kafka-0.11</module>
diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index d0bc720..13a5b5f 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -103,6 +103,13 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <!-- Used by maven-dependency-plugin -->
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
@@ -206,6 +213,12 @@ under the License.
<version>${project.version}</version>
<type>jar</type>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 945c689..94a8fe2 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -158,8 +158,11 @@ run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_D
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"
-run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
-run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"
+
run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
run_test "SQL Client end-to-end test for modern Kafka" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 0ef278a..8a2afcb 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -74,7 +74,7 @@ function verify_result_line_number {
while : ; do
curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output || true
- if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
+ if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ] || [ -n "$(grep "\"value\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
echo "Elasticsearch end to end test pass."
break
else
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 8eec6ea..5420924 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -20,12 +20,17 @@
set -Eeuo pipefail
PLANNER="${1:-old}"
+ELASTICSEARCH_VERSION=${2:-6}
KAFKA_VERSION="2.2.0"
CONFLUENT_VERSION="5.0.0"
CONFLUENT_MAJOR_VERSION="5.0"
KAFKA_SQL_VERSION="universal"
+ELASTICSEARCH6_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
+ELASTICSEARCH7_MAC_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-darwin-x86_64.tar.gz'
+ELASTICSEARCH7_LINUX_DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.5.1-linux-x86_64.tar.gz'
+
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/kafka_sql_common.sh \
$KAFKA_VERSION \
@@ -85,6 +90,30 @@ function sql_cleanup() {
}
on_exit sql_cleanup
+function prepare_elasticsearch {
+ echo "Preparing Elasticsearch(version=$ELASTICSEARCH_VERSION)..."
+ # elastcisearch offers different release binary file for corresponding system since version 7.
+ case "$(uname -s)" in
+ Linux*) OS_TYPE=linux;;
+ Darwin*) OS_TYPE=mac;;
+ *) OS_TYPE="UNKNOWN:${unameOut}"
+ esac
+
+ if [[ "$ELASTICSEARCH_VERSION" == 6 ]]; then
+ DOWNLOAD_URL=$ELASTICSEARCH6_DOWNLOAD_URL
+ elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "mac" ]]; then
+ DOWNLOAD_URL=$ELASTICSEARCH7_MAC_DOWNLOAD_URL
+ elif [[ "$ELASTICSEARCH_VERSION" == 7 ]] && [[ "$OS_TYPE" == "linux" ]]; then
+ DOWNLOAD_URL=$ELASTICSEARCH7_LINUX_DOWNLOAD_URL
+ else
+ echo "[ERROR] Unsupported elasticsearch version($ELASTICSEARCH_VERSION) for OS: $OS_TYPE"
+ exit 1
+ fi
+
+ setup_elasticsearch $DOWNLOAD_URL
+ wait_elasticsearch_working
+}
+
# prepare Kafka
echo "Preparing Kafka..."
@@ -96,13 +125,7 @@ create_kafka_json_source test-json
create_kafka_topic 1 1 test-avro
# prepare Elasticsearch
-echo "Preparing Elasticsearch..."
-
-ELASTICSEARCH_VERSION=6
-DOWNLOAD_URL='https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
-
-setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION
-wait_elasticsearch_working
+prepare_elasticsearch
################################################################################
# Prepare Flink
@@ -121,7 +144,7 @@ echo "Testing SQL statements..."
JSON_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "json" )
KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka_" )
-ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch6" )
+ELASTICSEARCH_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "elasticsearch$ELASTICSEARCH_VERSION" )
# create session environment file
RESULT=$TEST_DATA_DIR/result
@@ -146,7 +169,7 @@ cat >> $SQL_CONF << EOF
data-type: BIGINT
connector:
type: elasticsearch
- version: 6
+ version: "$ELASTICSEARCH_VERSION"
hosts: "http://localhost:9200"
index: "$ELASTICSEARCH_INDEX"
document-type: "user"
@@ -167,7 +190,7 @@ cat >> $SQL_CONF << EOF
data-type: BIGINT
connector:
type: elasticsearch
- version: 6
+ version: "$ELASTICSEARCH_VERSION"
hosts: "http://localhost:9200"
index: "$ELASTICSEARCH_INDEX"
document-type: "user"
@@ -190,7 +213,7 @@ EOF
echo "Executing SQL: Values -> Elasticsearch (upsert)"
-SQL_STATEMENT_3=$(cat << EOF
+SQL_STATEMENT_1=$(cat << EOF
INSERT INTO ElasticsearchUpsertSinkTable
SELECT user_id, user_name, COUNT(*) AS user_count
FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))
@@ -200,18 +223,20 @@ EOF
)
JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
- --library $SQL_JARS_DIR \
+ --jar $KAFKA_SQL_JAR \
+ --jar $JSON_SQL_JAR \
+ --jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
- --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
+ --update "$SQL_STATEMENT_1" | grep "Job ID:" | sed 's/.* //g')
wait_job_terminal_state "$JOB_ID" "FINISHED"
-verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 "21a76360e2a40f442816d940e7071ccf"
+verify_result_line_number 3 "$ELASTICSEARCH_INDEX"
echo "Executing SQL: Values -> Elasticsearch (append, no key)"
-SQL_STATEMENT_4=$(cat << EOF
+SQL_STATEMENT_2=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT *
FROM (
@@ -232,7 +257,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
- --update "$SQL_STATEMENT_4" | grep "Job ID:" | sed 's/.* //g')
+ --update "$SQL_STATEMENT_2" | grep "Job ID:" | sed 's/.* //g')
wait_job_terminal_state "$JOB_ID" "FINISHED"
@@ -241,7 +266,7 @@ verify_result_line_number 9 "$ELASTICSEARCH_INDEX"
echo "Executing SQL: Match recognize -> Elasticsearch"
-SQL_STATEMENT_5=$(cat << EOF
+SQL_STATEMENT_3=$(cat << EOF
INSERT INTO ElasticsearchAppendSinkTable
SELECT 1 as user_id, T.userName as user_name, cast(1 as BIGINT) as user_count
FROM (
@@ -265,7 +290,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
--jar $ELASTICSEARCH_SQL_JAR \
--jar $SQL_TOOLBOX_JAR \
--environment $SQL_CONF \
- --update "$SQL_STATEMENT_5" | grep "Job ID:" | sed 's/.* //g')
+ --update "$SQL_STATEMENT_3" | grep "Job ID:" | sed 's/.* //g')
# 3 upsert results and 6 append results and 3 match_recognize results
verify_result_line_number 12 "$ELASTICSEARCH_INDEX"
diff --git a/tools/travis/splits/split_misc.sh b/tools/travis/splits/split_misc.sh
index a6d00c0..c9102b0 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -73,8 +73,11 @@ fi
run_test "State TTL Heap backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file" "skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test" "$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks" "skip_check_exceptions"
-run_test "SQL Client end-to-end test (Old planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
-run_test "SQL Client end-to-end test (Blink planner)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 6"
+run_test "SQL Client end-to-end test (Old planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh old 7"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 6"
+run_test "SQL Client end-to-end test (Blink planner) Elasticsearch (v7.5.1)" "$END_TO_END_DIR/test-scripts/test_sql_client.sh blink 7"
+
if [[ ${PROFILE} != *"jdk11"* ]]; then
run_test "SQL Client end-to-end test for Kafka 0.10" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
run_test "SQL Client end-to-end test for Kafka 0.11" "$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"