You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/12 09:37:01 UTC
[3/5] flink git commit: [FLINK-9451][tests] Add scala quickstart
end-to-end test
[FLINK-9451][tests] Add scala quickstart end-to-end test
This closes #6089.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e90c77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e90c77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e90c77
Branch: refs/heads/master
Commit: 42e90c77319eeb4c1a073a66449af3dde016d61d
Parents: 89ab475
Author: Yadan.JS <y_...@yahoo.com>
Authored: Mon May 28 12:16:40 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jun 12 11:34:38 2018 +0200
----------------------------------------------------------------------
.../flink-quickstart-test/pom.xml | 63 +++++++++++++
.../test/Elasticsearch5SinkExample.java | 94 ++++++++++++++++++++
.../test/Elasticsearch5SinkExample.scala | 77 ++++++++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 7 +-
.../test-scripts/elasticsearch-common.sh | 5 +-
.../test-scripts/test_quickstarts.sh | 70 +++++++++------
.../test_streaming_elasticsearch.sh | 4 +-
8 files changed, 289 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
new file mode 100644
index 0000000..46b402d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-quickstart-test</artifactId>
+ <name>flink-quickstart-test</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-quickstart-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-quickstart-scala</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..900e495
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstarts.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ if (parameterTool.getNumberOfParameters() < 3) {
+ System.out.println("Missing parameters!\n" +
+ "Usage: --numRecords <numRecords> --index <index> --type <type>");
+ return;
+ }
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(5000);
+
+ DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+ .map(new MapFunction<Long, String>() {
+ @Override
+ public String map(Long value) throws Exception {
+ return "message #" + value;
+ }
+ });
+
+ Map<String, String> userConfig = new HashMap<>();
+ userConfig.put("cluster.name", "elasticsearch");
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ List<InetSocketAddress> transports = new ArrayList<>();
+ transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+ source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+ @Override
+ public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+ indexer.add(createIndexRequest(element, parameterTool));
+ }
+ }));
+
+ env.execute("Elasticsearch5.x end to end sink test example");
+ }
+
+ private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index(parameterTool.getRequired("index"))
+ .type(parameterTool.getRequired("type"))
+ .id(element)
+ .source(json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
new file mode 100644
index 0000000..9e2a115
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstarts.test
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import scala.collection.JavaConversions.mapAsJavaMap
+
+import java.net.{InetAddress, InetSocketAddress}
+import java.util
+
+
+object Elasticsearch5SinkExample {
+ def main(args: Array[String]) {
+
+ val parameterTool = ParameterTool.fromArgs(args)
+
+ if (parameterTool.getNumberOfParameters < 3) {
+ println("Missing parameters!\n" + "Usage:" +
+ " --numRecords <numRecords> --index <index> --type <type>")
+ return
+ }
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.disableSysoutLogging
+ env.enableCheckpointing(5000)
+
+ val source: DataStream[(String)] = env.generateSequence(0, 20 - 1)
+ .map(v => "message #" + v.toString)
+
+ val config = new util.HashMap[String, String]
+ config.put("bulk.flush.max.actions", "1")
+ config.put("cluster.name", "elasticsearch") //default cluster name: elasticsearch
+
+ val transports = new util.ArrayList[InetSocketAddress]
+ transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+
+ source.addSink(new ElasticsearchSink(config, transports,
+ new ElasticsearchSinkFunction[(String)] {
+ def createIndexRequest(element: (String)): IndexRequest = {
+
+ val json2 = Map(
+ "data" -> element.asInstanceOf[AnyRef]
+ )
+
+ Requests.indexRequest.index(parameterTool.getRequired("index"))
+ .`type`(parameterTool.getRequired("type")).source(mapAsJavaMap(json2))
+ }
+
+ override def process(element: (String), ctx: RuntimeContext, indexer: RequestIndexer) {
+ indexer.add(createIndexRequest(element))
+ }
+ }))
+
+ env.execute("Elasticsearch5.x end to end sink test example")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 581abc8..8fb7eb8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -47,6 +47,7 @@ under the License.
<module>flink-elasticsearch1-test</module>
<module>flink-elasticsearch2-test</module>
<module>flink-elasticsearch5-test</module>
+ <module>flink-quickstart-test</module>
</modules>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 39007f5..46b2609 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -191,7 +191,12 @@ if [ $EXIT_CODE == 0 ]; then
fi
if [ $EXIT_CODE == 0 ]; then
- run_test "Quickstarts nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh"
+ run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
+ EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+ run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
EXIT_CODE=$?
fi
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 0ef6d55..87ffa82 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist {
function verify_result {
local numRecords=$1
+ local index=$2
if [ -f "$TEST_DATA_DIR/output" ]; then
rm $TEST_DATA_DIR/output
fi
while : ; do
- curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+ curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output
if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
echo "Elasticsearch end to end test pass."
@@ -75,6 +76,8 @@ function verify_result {
}
function shutdown_elasticsearch_cluster {
+ local index=$1
+ curl -X DELETE "http://localhost:9200/${index}"
pid=$(jps | grep Elasticsearch | awk '{print $1}')
kill -9 $pid
}
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
index 3db64fc..9657938 100755
--- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -18,45 +18,59 @@
################################################################################
# End to end test for quick starts test.
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_quickstarts.sh <Type (java or scala)>
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/elasticsearch-common.sh
-mkdir -p $TEST_DATA_DIR
+TEST_TYPE=$1
+TEST_CLASS_NAME=Elasticsearch5SinkExample
+TEST_FILE_PATH=flink-quickstart-test/src/main/${TEST_TYPE}/org/apache/flink/quickstarts/test/${TEST_CLASS_NAME}.${TEST_TYPE}
+QUICKSTARTS_FILE_PATH=${TEST_DATA_DIR}/flink-quickstart-${TEST_TYPE}/src/main/${TEST_TYPE}/org/apache/flink/quickstart/${TEST_CLASS_NAME}.${TEST_TYPE}
+ES_INDEX=index_${TEST_TYPE}
-cd $TEST_DATA_DIR
-
-mvn archetype:generate \
- -DarchetypeGroupId=org.apache.flink \
- -DarchetypeArtifactId=flink-quickstart-java \
- -DarchetypeVersion=1.6-SNAPSHOT \
- -DgroupId=org.apache.flink.quickstart \
- -DartifactId=flink-java-project \
- -Dversion=0.1 \
- -Dpackage=org.apache.flink.quickstart \
+# get the elasticsearch dependency from flink-quickstart-test
+ES_DEPENDENCY="<dependency>\
+<groupId>org.apache.flink</groupId>\
+$(awk '/flink-connector-elasticsearch/ {print $1}' ${END_TO_END_DIR}/flink-quickstart-test/target/dependency-reduced-pom.xml)\
+<version>\${flink.version}</version>\
+</dependency>"
+
+mkdir -p "${TEST_DATA_DIR}"
+cd "${TEST_DATA_DIR}"
+
+ARTIFACT_ID=flink-quickstart-${TEST_TYPE}
+ARTIFACT_VERSION=0.1
+
+mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-quickstart-${TEST_TYPE} \
+ -DarchetypeVersion=1.6-SNAPSHOT \
+ -DgroupId=org.apache.flink.quickstart \
+ -DartifactId=${ARTIFACT_ID} \
+ -Dversion=${ARTIFACT_VERSION} \
+ -Dpackage=org.apache.flink.quickstart \
-DinteractiveMode=false
-cd flink-java-project
+cd "${ARTIFACT_ID}"
-# use the Flink Elasticsearch sink example job code in flink-elasticsearch5-tests to simulate modifications to contained job
-cp ${END_TO_END_DIR}/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
-sed -i -e 's/package org.apache.flink.streaming.tests;/package org.apache.flink.quickstart;/' $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/Elasticsearch5SinkExample.java
+# use the Flink Elasticsearch sink example job code in flink-quickstart-test to simulate modifications to contained job
+cp ${END_TO_END_DIR}/${TEST_FILE_PATH} "$QUICKSTARTS_FILE_PATH"
+sed -i -e 's/package org.apache.flink.quickstarts.test/package org.apache.flink.quickstart/' "${QUICKSTARTS_FILE_PATH}"
position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
-sed -i -e ''"$(($position + 1))"'i\
-<dependency>\
-<groupId>org.apache.flink</groupId>\
-<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>\
-<version>${flink.version}</version>\
-</dependency>' pom.xml
+# Add ElasticSearch dependency to pom.xml
+sed -i -e ''$(($position + 1))'i\
+'${ES_DEPENDENCY}'' pom.xml
-sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.streaming.tests.Elasticsearch5SinkExample/" pom.xml
+sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.$TEST_CLASS_NAME/" pom.xml
mvn clean package -nsu
cd target
-jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+jar tvf flink-quickstart-${TEST_TYPE}-0.1.jar > contentsInJar.txt
if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
`grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
@@ -86,21 +100,21 @@ setup_elasticsearch "https://artifacts.elastic.co/downloads/elasticsearch/elasti
verify_elasticsearch_process_exist
function shutdownAndCleanup {
- shutdown_elasticsearch_cluster
+ shutdown_elasticsearch_cluster "$ES_INDEX"
# make sure to run regular cleanup as well
cleanup
}
trap shutdownAndCleanup INT
trap shutdownAndCleanup EXIT
-TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
start_cluster
-$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.Elasticsearch5SinkExample $TEST_PROGRAM_JAR \
+${FLINK_DIR}/bin/flink run -c org.apache.flink.quickstart.Elasticsearch5SinkExample "$TEST_PROGRAM_JAR" \
--numRecords 20 \
- --index index \
+ --index "${ES_INDEX}" \
--type type
-verify_result 20
+verify_result 20 "${ES_INDEX}"
http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 10e72e9..7464409 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -31,7 +31,7 @@ verify_elasticsearch_process_exist
start_cluster
function test_cleanup {
- shutdown_elasticsearch_cluster
+ shutdown_elasticsearch_cluster index
# make sure to run regular cleanup as well
cleanup
@@ -48,4 +48,4 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
--index index \
--type type
-verify_result 20
+verify_result 20 index