You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/06 16:15:29 UTC

[GitHub] asfgit closed pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …

asfgit closed pull request #6994: [FLINK-10720][tests] Add deployment end-to-end stress test with many …
URL: https://github.com/apache/flink/pull/6994
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 3c8d0ad537f..8bd649cc5eb 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -88,7 +88,7 @@
  *         Total duration is (sliding_window_operator.num_events) * (sequence_generator_source.event_time.clock_progress).</li>
  * </ul>
  */
-class DataStreamAllroundTestJobFactory {
+public class DataStreamAllroundTestJobFactory {
 	private static final ConfigOption<String> TEST_SEMANTICS = ConfigOptions
 		.key("test.semantics")
 		.defaultValue("exactly-once")
@@ -201,7 +201,7 @@
 		.key("tumbling_window_operator.num_events")
 		.defaultValue(20L);
 
-	static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
+	public static void setupEnvironment(StreamExecutionEnvironment env, ParameterTool pt) throws Exception {
 
 		// set checkpointing semantics
 		String semantics = pt.get(TEST_SEMANTICS.key(), TEST_SEMANTICS.defaultValue());
diff --git a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
new file mode 100644
index 00000000000..fa9444dec1b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.8-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-heavy-deployment-stress-test</artifactId>
+	<name>flink-heavy-deployment-stress-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.11</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-datastream-allround-test</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.11</artifactId>
+			<version>${project.version}</version>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>HeavyDeploymentStressTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<finalName>HeavyDeploymentStressTestProgram</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.apache.flink.deployment.HeavyDeploymentStressTestProgram</mainClass>
+								</transformer>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>
diff --git a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
new file mode 100644
index 00000000000..d65583f98af
--- /dev/null
+++ b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.deployment;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+
+/**
+ * End-to-end test for heavy deployment descriptors. This test creates a heavy deployment by producing inflated meta
+ * data for the source's operator state. The state is registered as union state and will be multiplied in deployment.
+ */
+public class HeavyDeploymentStressTestProgram {
+
+	private static final ConfigOption<Integer> NUM_LIST_STATES_PER_OP = ConfigOptions
+		.key("heavy_deployment_test.num_list_states_per_op")
+		.defaultValue(100);
+
+	private static final ConfigOption<Integer> NUM_PARTITIONS_PER_LIST_STATE = ConfigOptions
+		.key("heavy_deployment_test.num_partitions_per_list_state")
+		.defaultValue(100);
+
+	public static void main(String[] args) throws Exception {
+
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		setupEnvironment(env, pt);
+
+		final int numStates =
+			pt.getInt(NUM_LIST_STATES_PER_OP.key(), NUM_LIST_STATES_PER_OP.defaultValue());
+		final int numPartitionsPerState =
+			pt.getInt(NUM_PARTITIONS_PER_LIST_STATE.key(), NUM_PARTITIONS_PER_LIST_STATE.defaultValue());
+
+		Preconditions.checkState(env.getCheckpointInterval() > 0L, "Checkpointing must be enabled for this test!");
+
+		env.addSource(new SimpleEndlessSourceWithBloatedState(numStates, numPartitionsPerState)).setParallelism(env.getParallelism())
+			.addSink(new DiscardingSink<>()).setParallelism(1);
+
+		env.execute("HeavyDeploymentStressTestProgram");
+	}
+
+	/**
+	 * Source with dummy operator state that results in inflated meta data.
+	 */
+	static class SimpleEndlessSourceWithBloatedState extends RichParallelSourceFunction<String>
+		implements CheckpointedFunction, CheckpointListener {
+
+		private static final long serialVersionUID = 1L;
+
+		private final int numListStates;
+		private final int numPartitionsPerListState;
+
+		private transient volatile boolean isRunning;
+
+		/** Flag to induce failure after we have a valid checkpoint. */
+		private transient volatile boolean readyToFail;
+
+		SimpleEndlessSourceWithBloatedState(int numListStates, int numPartitionsPerListState) {
+			this.numListStates = numListStates;
+			this.numPartitionsPerListState = numPartitionsPerListState;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) {
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+
+			readyToFail = false;
+
+			if (context.isRestored()) {
+				isRunning = false;
+			} else {
+				isRunning = true;
+
+				OperatorStateStore operatorStateStore = context.getOperatorStateStore();
+				for (int i = 0; i < numListStates; ++i) {
+
+					ListStateDescriptor<String> listStateDescriptor =
+						new ListStateDescriptor<>("test-list-state-" + i, String.class);
+
+					ListState<String> unionListState =
+						operatorStateStore.getUnionListState(listStateDescriptor);
+
+					for (int j = 0; j < numPartitionsPerListState; ++j) {
+						unionListState.add(String.valueOf(j));
+					}
+				}
+			}
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			while (isRunning) {
+
+				if (readyToFail && getRuntimeContext().getIndexOfThisSubtask() == 0) {
+					throw new Exception("Artificial failure.");
+				}
+
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect("test-element");
+				}
+
+				Thread.sleep(1);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			this.isRunning = false;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			readyToFail = true;
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 07b65f9a1e3..2e1ae305694 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -56,6 +56,7 @@ under the License.
 		<module>flink-streaming-file-sink-test</module>
 		<module>flink-state-evolution-test</module>
 		<module>flink-e2e-test-utils</module>
+		<module>flink-heavy-deployment-stress-test</module>
 	</modules>
 
 	<build>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 832bdeef93a..cbd217df2a4 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -128,5 +128,7 @@ run_test "Running Kerberized YARN on Docker test " "$END_TO_END_DIR/test-scripts
 
 run_test "SQL Client end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_client.sh"
 
+run_test "Heavy deployment end-to-end test" "$END_TO_END_DIR/test-scripts/test_heavy_deployment.sh"
+
 printf "\n[PASS] All tests passed\n"
 exit 0
diff --git a/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
new file mode 100755
index 00000000000..895e4a7f302
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_heavy_deployment.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
+
+TEST=flink-heavy-deployment-stress-test
+TEST_PROGRAM_NAME=HeavyDeploymentStressTestProgram
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
+
+set_conf "taskmanager.heap.mb" "256" # 256Mb x 20TMs = 5Gb total heap
+
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8mb"
+set_conf "taskmanager.network.memory.max" "8mb"
+set_conf "taskmanager.memory.segment-size" "8kb"
+
+set_conf "taskmanager.numberOfTaskSlots" "10" # 10 slots per TM
+
+start_cluster # this also starts 1TM
+start_taskmanagers 19 # 1TM + 19TM = 20TM a 10 slots = 200 slots
+
+# This call will result in a deployment with state meta data of 200 x 200 x 50 union states x each 75 entries.
+# We can scale up the numbers to make the test even heavier.
+$FLINK_DIR/bin/flink run ${TEST_PROGRAM_JAR} \
+--environment.max_parallelism 1024 --environment.parallelism 200 \
+--environment.restart_strategy fixed_delay --environment.restart_strategy.fixed_delay.attempts 3 \
+--state_backend.checkpoint_directory ${CHECKPOINT_DIR} \
+--heavy_deployment_test.num_list_states_per_op 50 --heavy_deployment_test.num_partitions_per_list_state 75


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services