You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/12 09:37:01 UTC

[3/5] flink git commit: [FLINK-9451][tests] Add scala quickstart end-to-end test

[FLINK-9451][tests] Add scala quickstart end-to-end test

This closes #6089.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42e90c77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42e90c77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42e90c77

Branch: refs/heads/master
Commit: 42e90c77319eeb4c1a073a66449af3dde016d61d
Parents: 89ab475
Author: Yadan.JS <y_...@yahoo.com>
Authored: Mon May 28 12:16:40 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jun 12 11:34:38 2018 +0200

----------------------------------------------------------------------
 .../flink-quickstart-test/pom.xml               | 63 +++++++++++++
 .../test/Elasticsearch5SinkExample.java         | 94 ++++++++++++++++++++
 .../test/Elasticsearch5SinkExample.scala        | 77 ++++++++++++++++
 flink-end-to-end-tests/pom.xml                  |  1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |  7 +-
 .../test-scripts/elasticsearch-common.sh        |  5 +-
 .../test-scripts/test_quickstarts.sh            | 70 +++++++++------
 .../test_streaming_elasticsearch.sh             |  4 +-
 8 files changed, 289 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
new file mode 100644
index 0000000..46b402d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-quickstart-test</artifactId>
+	<name>flink-quickstart-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-quickstart-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-quickstart-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
new file mode 100644
index 0000000..900e495
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstarts.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * End to end test for Elasticsearch5Sink.
+ */
+public class Elasticsearch5SinkExample {
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		if (parameterTool.getNumberOfParameters() < 3) {
+			System.out.println("Missing parameters!\n" +
+				"Usage: --numRecords <numRecords> --index <index> --type <type>");
+			return;
+		}
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(5000);
+
+		DataStream<String> source = env.generateSequence(0, parameterTool.getInt("numRecords") - 1)
+			.map(new MapFunction<Long, String>() {
+				@Override
+				public String map(Long value) throws Exception {
+					return "message #" + value;
+				}
+			});
+
+		Map<String, String> userConfig = new HashMap<>();
+		userConfig.put("cluster.name", "elasticsearch");
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		userConfig.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		List<InetSocketAddress> transports = new ArrayList<>();
+		transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
+
+		source.addSink(new ElasticsearchSink<>(userConfig, transports, new ElasticsearchSinkFunction<String>() {
+			@Override
+			public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
+				indexer.add(createIndexRequest(element, parameterTool));
+			}
+		}));
+
+		env.execute("Elasticsearch5.x end to end sink test example");
+	}
+
+	private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
+		Map<String, Object> json = new HashMap<>();
+		json.put("data", element);
+
+		return Requests.indexRequest()
+			.index(parameterTool.getRequired("index"))
+			.type(parameterTool.getRequired("type"))
+			.id(element)
+			.source(json);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
new file mode 100644
index 0000000..9e2a115
--- /dev/null
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstarts.test
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
+import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
+import org.elasticsearch.action.index.IndexRequest
+import org.elasticsearch.client.Requests
+
+import scala.collection.JavaConversions.mapAsJavaMap
+
+import java.net.{InetAddress, InetSocketAddress}
+import java.util
+
+
+object Elasticsearch5SinkExample {
+  def main(args: Array[String]) {
+
+    val parameterTool = ParameterTool.fromArgs(args)
+
+    if (parameterTool.getNumberOfParameters < 3) {
+      println("Missing parameters!\n" + "Usage:" +
+        " --numRecords <numRecords> --index <index> --type <type>")
+      return
+    }
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.disableSysoutLogging
+    env.enableCheckpointing(5000)
+
+    val source: DataStream[(String)] = env.generateSequence(0, 20 - 1)
+      .map(v =>  "message #" + v.toString)
+
+    val config = new util.HashMap[String, String]
+    config.put("bulk.flush.max.actions", "1")
+    config.put("cluster.name", "elasticsearch") //default cluster name: elasticsearch
+
+    val transports = new util.ArrayList[InetSocketAddress]
+    transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
+
+    source.addSink(new ElasticsearchSink(config, transports,
+      new ElasticsearchSinkFunction[(String)] {
+      def createIndexRequest(element: (String)): IndexRequest = {
+
+        val json2 = Map(
+          "data" -> element.asInstanceOf[AnyRef]
+        )
+
+        Requests.indexRequest.index(parameterTool.getRequired("index"))
+          .`type`(parameterTool.getRequired("type")).source(mapAsJavaMap(json2))
+      }
+
+      override def process(element: (String), ctx: RuntimeContext, indexer: RequestIndexer) {
+        indexer.add(createIndexRequest(element))
+      }
+    }))
+
+    env.execute("Elasticsearch5.x end to end sink test example")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 581abc8..8fb7eb8 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -47,6 +47,7 @@ under the License.
 		<module>flink-elasticsearch1-test</module>
 		<module>flink-elasticsearch2-test</module>
 		<module>flink-elasticsearch5-test</module>
+		<module>flink-quickstart-test</module>
 	</modules>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 39007f5..46b2609 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -191,7 +191,12 @@ if [ $EXIT_CODE == 0 ]; then
 fi
 
 if [ $EXIT_CODE == 0 ]; then
-  run_test "Quickstarts nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh"
+  run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
   EXIT_CODE=$?
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 0ef6d55..87ffa82 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -56,13 +56,14 @@ function verify_elasticsearch_process_exist {
 
 function verify_result {
     local numRecords=$1
+    local index=$2
 
     if [ -f "$TEST_DATA_DIR/output" ]; then
         rm $TEST_DATA_DIR/output
     fi
 
     while : ; do
-      curl 'localhost:9200/index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output
+      curl "localhost:9200/${index}/_search?q=*&pretty&size=21" > $TEST_DATA_DIR/output
 
       if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/output)" ]; then
           echo "Elasticsearch end to end test pass."
@@ -75,6 +76,8 @@ function verify_result {
 }
 
 function shutdown_elasticsearch_cluster {
+   local index=$1
+   curl -X DELETE "http://localhost:9200/${index}"
    pid=$(jps | grep Elasticsearch | awk '{print $1}')
    kill -9 $pid
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
index 3db64fc..9657938 100755
--- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
+++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh
@@ -18,45 +18,59 @@
 ################################################################################
 
 # End to end test for quick starts test.
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_quickstarts.sh <Type (java or scala)>
 
 source "$(dirname "$0")"/common.sh
 source "$(dirname "$0")"/elasticsearch-common.sh
 
-mkdir -p $TEST_DATA_DIR
+TEST_TYPE=$1
+TEST_CLASS_NAME=Elasticsearch5SinkExample
+TEST_FILE_PATH=flink-quickstart-test/src/main/${TEST_TYPE}/org/apache/flink/quickstarts/test/${TEST_CLASS_NAME}.${TEST_TYPE}
+QUICKSTARTS_FILE_PATH=${TEST_DATA_DIR}/flink-quickstart-${TEST_TYPE}/src/main/${TEST_TYPE}/org/apache/flink/quickstart/${TEST_CLASS_NAME}.${TEST_TYPE}
+ES_INDEX=index_${TEST_TYPE}
 
-cd $TEST_DATA_DIR
-
-mvn archetype:generate                             \
-    -DarchetypeGroupId=org.apache.flink            \
-    -DarchetypeArtifactId=flink-quickstart-java    \
-    -DarchetypeVersion=1.6-SNAPSHOT                \
-    -DgroupId=org.apache.flink.quickstart          \
-    -DartifactId=flink-java-project                \
-    -Dversion=0.1                                  \
-    -Dpackage=org.apache.flink.quickstart          \
+# get the elasticsearch dependency from flink-quickstart-test
+ES_DEPENDENCY="<dependency>\
+<groupId>org.apache.flink</groupId>\
+$(awk '/flink-connector-elasticsearch/ {print $1}' ${END_TO_END_DIR}/flink-quickstart-test/target/dependency-reduced-pom.xml)\
+<version>\${flink.version}</version>\
+</dependency>"
+
+mkdir -p "${TEST_DATA_DIR}"
+cd "${TEST_DATA_DIR}"
+
+ARTIFACT_ID=flink-quickstart-${TEST_TYPE}
+ARTIFACT_VERSION=0.1
+
+mvn archetype:generate                                   \
+    -DarchetypeGroupId=org.apache.flink                  \
+    -DarchetypeArtifactId=flink-quickstart-${TEST_TYPE}  \
+    -DarchetypeVersion=1.6-SNAPSHOT                      \
+    -DgroupId=org.apache.flink.quickstart                \
+    -DartifactId=${ARTIFACT_ID}                          \
+    -Dversion=${ARTIFACT_VERSION}                        \
+    -Dpackage=org.apache.flink.quickstart                \
     -DinteractiveMode=false
 
-cd flink-java-project
+cd "${ARTIFACT_ID}"
 
-# use the Flink Elasticsearch sink example job code in flink-elasticsearch5-tests to simulate modifications to contained job
-cp ${END_TO_END_DIR}/flink-elasticsearch5-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch5SinkExample.java $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
-sed -i -e 's/package org.apache.flink.streaming.tests;/package org.apache.flink.quickstart;/' $TEST_DATA_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/Elasticsearch5SinkExample.java
+# use the Flink Elasticsearch sink example job code in flink-quickstart-test to simulate modifications to contained job
+cp ${END_TO_END_DIR}/${TEST_FILE_PATH} "$QUICKSTARTS_FILE_PATH"
+sed -i -e 's/package org.apache.flink.quickstarts.test/package org.apache.flink.quickstart/' "${QUICKSTARTS_FILE_PATH}"
 
 position=$(awk '/<dependencies>/ {print NR}' pom.xml | head -1)
 
-sed -i -e ''"$(($position + 1))"'i\
-<dependency>\
-<groupId>org.apache.flink</groupId>\
-<artifactId>flink-connector-elasticsearch5_${scala.binary.version}</artifactId>\
-<version>${flink.version}</version>\
-</dependency>' pom.xml
+# Add ElasticSearch dependency to pom.xml
+sed -i -e ''$(($position + 1))'i\
+'${ES_DEPENDENCY}'' pom.xml
 
-sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.streaming.tests.Elasticsearch5SinkExample/" pom.xml
+sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.$TEST_CLASS_NAME/" pom.xml
 
 mvn clean package -nsu
 
 cd target
-jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+jar tvf flink-quickstart-${TEST_TYPE}-0.1.jar > contentsInJar.txt
 
 if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
       `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \
@@ -86,21 +100,21 @@ setup_elasticsearch "https://artifacts.elastic.co/downloads/elasticsearch/elasti
 verify_elasticsearch_process_exist
 
 function shutdownAndCleanup {
-    shutdown_elasticsearch_cluster
 
+    shutdown_elasticsearch_cluster "$ES_INDEX"
     # make sure to run regular cleanup as well
     cleanup
 }
 trap shutdownAndCleanup INT
 trap shutdownAndCleanup EXIT
 
-TEST_PROGRAM_JAR=$TEST_DATA_DIR/flink-java-project/target/flink-java-project-0.1.jar
+TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar
 
 start_cluster
 
-$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.Elasticsearch5SinkExample $TEST_PROGRAM_JAR \
+${FLINK_DIR}/bin/flink run -c org.apache.flink.quickstart.Elasticsearch5SinkExample "$TEST_PROGRAM_JAR" \
   --numRecords 20 \
-  --index index \
+  --index "${ES_INDEX}" \
   --type type
 
-verify_result 20
+verify_result 20 "${ES_INDEX}"

http://git-wip-us.apache.org/repos/asf/flink/blob/42e90c77/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
index 10e72e9..7464409 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh
@@ -31,7 +31,7 @@ verify_elasticsearch_process_exist
 start_cluster
 
 function test_cleanup {
-  shutdown_elasticsearch_cluster
+  shutdown_elasticsearch_cluster index
 
   # make sure to run regular cleanup as well
    cleanup
@@ -48,4 +48,4 @@ $FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \
   --index index \
   --type type
 
-verify_result 20
+verify_result 20 index