You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/06/21 11:43:39 UTC
flink git commit: [FLINK-8982][e2e tests] Add E2E Tests for queryable
state.
Repository: flink
Updated Branches:
refs/heads/master 0bdde8377 -> 59a58e56c
[FLINK-8982][e2e tests] Add E2E Tests for queryable state.
Adds one test for normal execution and
one with a TM failure scenario.
This closes #5807.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59a58e56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59a58e56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59a58e56
Branch: refs/heads/master
Commit: 59a58e56cc2f5f1015244ef835a014e68675234f
Parents: 0bdde83
Author: Florian Schmidt <fl...@icloud.com>
Authored: Tue Mar 13 14:13:08 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jun 21 13:42:29 2018 +0200
----------------------------------------------------------------------
.../flink-queryable-state-test/pom.xml | 134 ++++++++++++++
.../streaming/tests/queryablestate/Email.java | 77 ++++++++
.../streaming/tests/queryablestate/EmailId.java | 73 ++++++++
.../tests/queryablestate/EmailInformation.java | 114 ++++++++++++
.../tests/queryablestate/LabelSurrogate.java | 65 +++++++
.../tests/queryablestate/QsConstants.java | 29 +++
.../tests/queryablestate/QsStateClient.java | 123 +++++++++++++
.../tests/queryablestate/QsStateProducer.java | 178 +++++++++++++++++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/run-nightly-tests.sh | 3 +
flink-end-to-end-tests/test-scripts/common.sh | 59 +++++-
.../test-scripts/queryable_state_base.sh | 56 ++++++
.../test-scripts/test_queryable_state.sh | 58 ++++++
.../test_queryable_state_restart_tm.sh | 174 ++++++++++++++++++
14 files changed, 1143 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/pom.xml b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
new file mode 100644
index 0000000..a32deea
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-queryable-state-test_${scala.binary.version}</artifactId>
+ <name>flink-queryable-state-test</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+
+ <executions>
+ <execution>
+ <id>QsStateProducer</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>QsStateProducer</classifier>
+ <archive>
+ <manifestEntries>
+ <program-class>
+ org.apache.flink.streaming.tests.queryablestate.QsStateProducer
+ </program-class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+
+ <execution>
+ <id>QsStateClient</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadeTestJar>false</shadeTestJar>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>
+ org.apache.flink.streaming.tests.queryablestate.QsStateClient
+ </mainClass>
+ </transformer>
+ </transformers>
+ <finalName>QsStateClient</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>rename</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <copy
+ file="${project.basedir}/target/flink-queryable-state-test_${scala.binary.version}-${project.version}-QsStateProducer.jar"
+ tofile="${project.basedir}/target/QsStateProducer.jar"/>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
new file mode 100644
index 0000000..c98ac6d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Toy email resentation.
+ */
+public class Email {
+
+ private EmailId emailId;
+ private Instant timestamp;
+ private String foo;
+ private LabelSurrogate label;
+
+ public Email(EmailId emailId, Instant timestamp, String foo, LabelSurrogate label) {
+ this.emailId = emailId;
+ this.timestamp = timestamp;
+ this.foo = foo;
+ this.label = label;
+ }
+
+ public EmailId getEmailId() {
+ return emailId;
+ }
+
+ public void setEmailId(EmailId emailId) {
+ this.emailId = emailId;
+ }
+
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Instant timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getFoo() {
+ return foo;
+ }
+
+ public void setFoo(String foo) {
+ this.foo = foo;
+ }
+
+ public LabelSurrogate getLabel() {
+ return label;
+ }
+
+ public void setLabel(LabelSurrogate label) {
+ this.label = label;
+ }
+
+ public String getDate() {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+ return formatter.format(timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
new file mode 100644
index 0000000..63c6276
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * POJO representing an EmailId.
+ */
+public class EmailId implements Serializable {
+
+ private static final long serialVersionUID = -5001464312464872467L;
+
+ private String emailId;
+
+ public EmailId() {
+
+ }
+
+ public EmailId(String emailId) {
+ this.emailId = Objects.requireNonNull(emailId);
+ }
+
+ public void setEmailId(String emailId) {
+ this.emailId = emailId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ EmailId emailId1 = (EmailId) o;
+
+ return Objects.equals(emailId, emailId1.emailId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(emailId);
+ }
+
+ public String getEmailId() {
+ return emailId;
+ }
+
+ @Override
+ public String toString() {
+ return "EmailId{" +
+ "emailId='" + emailId + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
new file mode 100644
index 0000000..b1d100a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * POJO representing some information about an email.
+ */
+public class EmailInformation implements Serializable {
+
+ private static final long serialVersionUID = -8956979869800484909L;
+
+ public void setEmailId(EmailId emailId) {
+ this.emailId = emailId;
+ }
+
+ private EmailId emailId;
+
+ public void setStuff(List<String> stuff) {
+ this.stuff = stuff;
+ }
+
+ private List<String> stuff;
+
+ public void setAsdf(Long asdf) {
+ this.asdf = asdf;
+ }
+
+ private Long asdf = 0L;
+
+ private transient LabelSurrogate label;
+
+ public EmailInformation() {
+
+ }
+
+ public EmailInformation(Email email) {
+ emailId = email.getEmailId();
+ stuff = new ArrayList<>();
+ stuff.add("1");
+ stuff.add("2");
+ stuff.add("3");
+ label = email.getLabel();
+ }
+
+ public EmailId getEmailId() {
+ return emailId;
+ }
+
+ public List<String> getStuff() {
+ return stuff;
+ }
+
+ public Long getAsdf() {
+ return asdf;
+ }
+
+ public LabelSurrogate getLabel() {
+ return label;
+ }
+
+ public void setLabel(LabelSurrogate label) {
+ this.label = label;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EmailInformation that = (EmailInformation) o;
+ return Objects.equals(emailId, that.emailId) &&
+ Objects.equals(stuff, that.stuff) &&
+ Objects.equals(asdf, that.asdf) &&
+ Objects.equals(label, that.label);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(emailId, stuff, asdf, label);
+ }
+
+ @Override
+ public String toString() {
+ return "EmailInformation{" +
+ "emailId=" + emailId +
+ ", stuff=" + stuff +
+ ", asdf=" + asdf +
+ ", label=" + label +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
new file mode 100644
index 0000000..0977ef0
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A label surrogate.
+ */
+public class LabelSurrogate {
+
+ private Type type;
+ private String foo;
+
+ public LabelSurrogate(Type type, String foo) {
+ this.type = type;
+ this.foo = foo;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getFoo() {
+ return foo;
+ }
+
+ public void setFoo(String foo) {
+ this.foo = foo;
+ }
+
+ @Override
+ public String toString() {
+ return "LabelSurrogate{" +
+ "type=" + type +
+ ", foo='" + foo + '\'' +
+ '}';
+ }
+
+ /**
+ * An exemplary enum.
+ */
+ public enum Type {
+ FOO,
+ BAR,
+ BAZ
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
new file mode 100644
index 0000000..7b26226
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A class containing the constants used in the end-to-end test.
+ */
+public class QsConstants {
+
+ public static final String QUERY_NAME = "state";
+ public static final String STATE_NAME = "state";
+
+ public static final String KEY = "";
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
new file mode 100644
index 0000000..b0b8ced
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A simple implementation of a queryable state client.
+ * This client queries the state for a while (~2.5 mins) and prints
+ * out the values that it found in the map state
+ *
+ * <p>Usage: java -jar QsStateClient.jar --host HOST --port PORT --job-id JOB_ID
+ */
+public class QsStateClient {
+
+ private static final int BOOTSTRAP_RETRIES = 240;
+
+ public static void main(final String[] args) throws Exception {
+
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+
+ // setup values
+ String jobId = parameters.getRequired("job-id");
+ String host = parameters.get("host", "localhost");
+ int port = parameters.getInt("port", 9069);
+ int numIterations = parameters.getInt("iterations", 1500);
+
+ QueryableStateClient client = new QueryableStateClient(host, port);
+ client.setExecutionConfig(new ExecutionConfig());
+
+ MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
+ new MapStateDescriptor<>(
+ QsConstants.STATE_NAME,
+ TypeInformation.of(new TypeHint<EmailId>() {
+
+ }),
+ TypeInformation.of(new TypeHint<EmailInformation>() {
+
+ })
+ );
+
+ // wait for state to exist
+ for (int i = 0; i < BOOTSTRAP_RETRIES; i++) { // ~120s
+ try {
+ getMapState(jobId, client, stateDescriptor);
+ break;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof UnknownKeyOrNamespaceException) {
+ System.err.println("State does not exist yet; sleeping 500ms");
+ Thread.sleep(500L);
+ } else {
+ throw e;
+ }
+ }
+
+ if (i == (BOOTSTRAP_RETRIES - 1)) {
+ throw new RuntimeException("Timeout: state doesn't exist after 120s");
+ }
+ }
+
+ // query state
+ for (int iterations = 0; iterations < numIterations; iterations++) {
+
+ MapState<EmailId, EmailInformation> mapState =
+ getMapState(jobId, client, stateDescriptor);
+
+ int counter = 0;
+ for (Map.Entry<EmailId, EmailInformation> entry: mapState.entries()) {
+ // this is to force deserialization
+ entry.getKey();
+ entry.getValue();
+ counter++;
+ }
+ System.out.println("MapState has " + counter + " entries"); // we look for it in the test
+
+ Thread.sleep(100L);
+ }
+ }
+
+ private static MapState<EmailId, EmailInformation> getMapState(
+ String jobId,
+ QueryableStateClient client,
+ MapStateDescriptor<EmailId, EmailInformation> stateDescriptor) throws InterruptedException, ExecutionException {
+
+ CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
+ client.getKvState(
+ JobID.fromHexString(jobId),
+ QsConstants.QUERY_NAME,
+ QsConstants.KEY, // which key of the keyed state to access
+ BasicTypeInfo.STRING_TYPE_INFO,
+ stateDescriptor);
+
+ return resultFuture.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
new file mode 100644
index 0000000..14eaac3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Streaming application that creates an {@link Email} pojo with random ids and increasing
+ * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction},
+ * where it is exposed as queryable state.
+ */
+public class QsStateProducer {
+
+ public static void main(final String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ParameterTool tool = ParameterTool.fromArgs(args);
+ String tmpPath = tool.getRequired("tmp-dir");
+ String stateBackendType = tool.getRequired("state-backend");
+
+ StateBackend stateBackend;
+ switch (stateBackendType) {
+ case "rocksdb":
+ stateBackend = new RocksDBStateBackend(tmpPath);
+ break;
+ case "fs":
+ stateBackend = new FsStateBackend(tmpPath);
+ break;
+ case "memory":
+ stateBackend = new MemoryStateBackend();
+ break;
+ default:
+ throw new RuntimeException("Unsupported state backend " + stateBackendType);
+ }
+
+ env.setStateBackend(stateBackend);
+ env.enableCheckpointing(1000L);
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+ env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
+
+ env.addSource(new EmailSource())
+ .keyBy(new KeySelector<Email, String>() {
+
+ private static final long serialVersionUID = -1480525724620425363L;
+
+ @Override
+ public String getKey(Email value) throws Exception {
+ return QsConstants.KEY;
+ }
+ })
+ .flatMap(new TestFlatMap());
+
+ env.execute();
+ }
+
+ private static class EmailSource extends RichSourceFunction<Email> {
+
+ private static final long serialVersionUID = -7286937645300388040L;
+
+ private transient volatile boolean isRunning;
+
+ private transient Random random;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.random = new Random();
+ this.isRunning = true;
+ }
+
+ @Override
+ public void run(SourceContext<Email> ctx) throws Exception {
+ // Sleep for 10 seconds on start to allow time to copy jobid
+ Thread.sleep(10000L);
+
+ int types = LabelSurrogate.Type.values().length;
+
+ while (isRunning) {
+ int r = random.nextInt(100);
+
+ final EmailId emailId = new EmailId(Integer.toString(random.nextInt()));
+ final Instant timestamp = Instant.now().minus(Duration.ofDays(1L));
+ final String foo = String.format("foo #%d", r);
+ final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");
+
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(new Email(emailId, timestamp, foo, label));
+ }
+
+ Thread.sleep(30L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
+ private static class TestFlatMap extends RichFlatMapFunction<Email, Object> implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 7821128115999005941L;
+
+ private transient MapState<EmailId, EmailInformation> state;
+ private transient int count;
+
+ @Override
+ public void open(Configuration parameters) {
+ MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
+ new MapStateDescriptor<>(
+ QsConstants.STATE_NAME,
+ TypeInformation.of(new TypeHint<EmailId>() {
+
+ }),
+ TypeInformation.of(new TypeHint<EmailInformation>() {
+
+ })
+ );
+ stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
+ state = getRuntimeContext().getMapState(stateDescriptor);
+ count = -1;
+ }
+
+ @Override
+ public void flatMap(Email value, Collector<Object> out) throws Exception {
+ state.put(value.getEmailId(), new EmailInformation(value));
+ count = Iterables.size(state.keys());
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) {
+ System.out.println("Count on snapshot: " + count); // we look for it in the test
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8fb7eb8..c169050 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
<module>flink-distributed-cache-via-blob-test</module>
<module>flink-high-parallelism-iterations-test</module>
<module>flink-stream-stateful-job-upgrade-test</module>
+ <module>flink-queryable-state-test</module>
<module>flink-local-recovery-and-allocation-test</module>
<module>flink-elasticsearch1-test</module>
<module>flink-elasticsearch2-test</module>
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index cf70558..c4c5069 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -43,6 +43,9 @@ echo "Flink distribution directory: $FLINK_DIR"
# run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
+run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
+run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
+
run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 3498b56..610be2d 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -113,6 +113,9 @@ function create_ha_config() {
#==============================================================================
rest.port: 8081
+
+ query.server.ports: 9000-9009
+ query.proxy.ports: 9010-9019
EOL
}
@@ -137,7 +140,7 @@ function start_local_zk {
address=${BASH_REMATCH[2]}
if [ "${address}" != "localhost" ]; then
- echo "[ERROR] Parse error. Only available for localhost."
+ echo "[ERROR] Parse error. Only available for localhost. Expected address 'localhost' but got '${address}'"
exit 1
fi
${FLINK_DIR}/bin/zookeeper.sh start $id
@@ -167,6 +170,32 @@ function start_cluster {
done
}
+function start_and_wait_for_tm {
+
+ tm_query_result=$(curl -s "http://localhost:8081/taskmanagers")
+
+ # we assume that the cluster is running
+ if ! [[ ${tm_query_result} =~ \{\"taskmanagers\":\[.*\]\} ]]; then
+ echo "Your cluster seems to be unresponsive at the moment: ${tm_query_result}" 1>&2
+ exit 1
+ fi
+
+ running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l`
+
+ ${FLINK_DIR}/bin/taskmanager.sh start
+
+ for i in {1..10}; do
+ local new_running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l`
+ if [ $((new_running_tms-running_tms)) -eq 0 ]; then
+ echo "TaskManager is not yet up."
+ else
+ echo "TaskManager is up."
+ break
+ fi
+ sleep 4
+ done
+}
+
function check_logs_for_errors {
if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
| grep -v "RetriableCommitFailedException" \
@@ -457,3 +486,31 @@ function end_timer {
duration=$SECONDS
echo "$(($duration / 60)) minutes and $(($duration % 60)) seconds"
}
+
+function clean_stdout_files {
+ rm ${FLINK_DIR}/log/*.out
+}
+
+function clean_log_files {
+ rm ${FLINK_DIR}/log/*
+}
+
+# Expect a string to appear in the log files of the task manager before a given timeout
+# $1: expected string
+# $2: timeout in seconds
+function expect_in_taskmanager_logs {
+ local expected="$1"
+ local timeout=$2
+ local i=0
+ local logfile="${FLINK_DIR}/log/flink*taskexecutor*log"
+
+
+ while ! grep "${expected}" ${logfile} > /dev/null; do
+ sleep 1s
+ ((i++))
+ if ((i > timeout)); then
+ echo "A timeout occurred waiting for '${expected}' to appear in the taskmanager logs"
+ exit 1
+ fi
+ done
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/queryable_state_base.sh b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
new file mode 100644
index 0000000..65ddef4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function link_queryable_state_lib {
+ echo "Moving flink-queryable-state-runtime from opt/ to lib/"
+ mv ${FLINK_DIR}/opt/flink-queryable-state-runtime* ${FLINK_DIR}/lib/
+ if [ $? != 0 ]; then
+ echo "Failed to move flink-queryable-state-runtime from opt/ to lib/. Exiting"
+ exit 1
+ fi
+}
+
+function unlink_queryable_state_lib {
+ echo "Moving flink-queryable-state-runtime from lib/ to opt/"
+ mv ${FLINK_DIR}/lib/flink-queryable-state-runtime* ${FLINK_DIR}/opt/
+ if [ $? != 0 ]; then
+ echo "Failed to move flink-queryable-state-runtime from lib/ to opt/. Exiting"
+ exit 1
+ fi
+}
+
+# Returns the ip address of the queryable state server
+function get_queryable_state_server_ip {
+ local ip=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
+ | grep "Started Queryable State Server" \
+ | head -1 \
+ | awk '{split($11, a, "/"); split(a[2], b, ":"); print b[1]}')
+
+ printf "${ip} \n"
+}
+
+# Returns the ip address of the queryable state server
+function get_queryable_state_proxy_port {
+ local port=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
+ | grep "Started Queryable State Proxy Server" \
+ | head -1 \
+ | awk '{split($12, a, "/"); split(a[2], b, ":"); split(b[2], c, "."); print c[1]}')
+
+ printf "${port} \n"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
new file mode 100755
index 0000000..8d74fac
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+
+function run_test {
+ link_queryable_state_lib
+ start_cluster
+
+ QUERYABLE_STATE_PRODUCER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar
+ QUERYABLE_STATE_CONSUMER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+ # start app with queryable state and wait for it to be available
+ JOB_ID=$(${FLINK_DIR}/bin/flink run \
+ -p 1 \
+ -d ${QUERYABLE_STATE_PRODUCER_JAR} \
+ --state-backend $1 \
+ --tmp-dir file://${TEST_DATA_DIR} \
+ | awk '{print $NF}' | tail -n 1)
+
+ wait_job_running ${JOB_ID}
+
+ # run the client and query state the first time
+ first_result=$(java -jar ${QUERYABLE_STATE_CONSUMER_JAR} \
+ --host $(get_queryable_state_server_ip) \
+ --port $(get_queryable_state_proxy_port) \
+ --job-id ${JOB_ID})
+
+ EXIT_CODE=$?
+
+ # Exit
+ exit ${EXIT_CODE}
+}
+
+function test_cleanup {
+ unlink_queryable_state_lib
+ clean_stdout_files
+}
+
+trap test_cleanup EXIT
+run_test $1
http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
new file mode 100755
index 0000000..06199ea
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
@@ -0,0 +1,174 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+
+QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar
+QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+#####################
+# Test that queryable state works as expected with HA mode when restarting a taskmanager
+#
+# The general outline is like this:
+# 1. start cluster in HA mode with 1 TM
+# 2. start a job that exposes queryable state from a mapstate with increasing num. of keys
+# 3. query the state with a queryable state client and expect no error to occur
+# 4. stop the TM
+# 5. check how many keys were in our mapstate at the time of the latest snapshot
+# 6. start a new TM
+# 7. query the state with a queryable state client and retrieve the number of elements
+# in the mapstate
+# 8. expect the number of elements in the mapstate after restart of TM to be > number of elements
+# at last snapshot
+#
+# Globals:
+# QUERYABLE_STATE_SERVER_JAR
+# QUERYABLE_STATE_CLIENT_JAR
+# Arguments:
+# None
+# Returns:
+# None
+#####################
+function run_test() {
+ local EXIT_CODE=0
+ local PARALLELISM=1 # parallelism of queryable state app
+ local PORT="9069" # port of queryable state server
+
+ # to ensure there are no files accidentally left behind by previous tests
+ clean_log_files
+ clean_stdout_files
+
+ link_queryable_state_lib
+ start_cluster
+
+ local JOB_ID=$(${FLINK_DIR}/bin/flink run \
+ -p ${PARALLELISM} \
+ -d ${QUERYABLE_STATE_SERVER_JAR} \
+ --state-backend "rocksdb" \
+ --tmp-dir file://${TEST_DATA_DIR} \
+ | awk '{print $NF}' | tail -n 1)
+
+ wait_job_running ${JOB_ID}
+ wait_for_number_of_checkpoints ${JOB_ID} 10 60
+
+ SERVER=$(get_queryable_state_server_ip)
+ PORT=$(get_queryable_state_proxy_port)
+
+ echo SERVER: ${SERVER}
+ echo PORT: ${PORT}
+
+ java -jar ${QUERYABLE_STATE_CLIENT_JAR} \
+ --host ${SERVER} \
+ --port ${PORT} \
+ --iterations 1 \
+ --job-id ${JOB_ID}
+
+ if [ $? != 0 ]; then
+ echo "An error occurred when executing queryable state client"
+ exit 1
+ fi
+
+ local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID})
+
+ kill_random_taskmanager
+
+ latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}')
+ echo "Latest snapshot count was ${latest_snapshot_count}"
+
+ sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone
+
+ start_and_wait_for_tm
+
+ # wait for some more checkpoint to have happened
+ ((current_num_checkpoints+=2))
+ wait_for_number_of_checkpoints ${JOB_ID} ${current_num_checkpoints} 60
+
+ local num_entries_in_map_state_after=$(java -jar ${QUERYABLE_STATE_CLIENT_JAR} \
+ --host ${SERVER} \
+ --port ${PORT} \
+ --iterations 1 \
+ --job-id ${JOB_ID} | grep "MapState has" | awk '{print $3}')
+
+ echo "after: $num_entries_in_map_state_after"
+
+ if ((latest_snapshot_count > num_entries_in_map_state_after)); then
+ echo "An error occurred"
+ EXIT_CODE=1
+ fi
+
+ exit ${EXIT_CODE}
+}
+
+###################################
+# Wait a specific number of successful checkpoints
+# to have happened
+#
+# Globals:
+# None
+# Arguments:
+# $1: the job id
+# $2: the number of expected successful checkpoints
+# $3: timeout in seconds
+# Returns:
+# None
+###################################
+function wait_for_number_of_checkpoints {
+ local job_id=$1
+ local expected_num_checkpoints=$2
+ local timeout=$3
+ local count=0
+
+ echo "Starting to wait for checkpoints"
+ while (($(get_completed_number_of_checkpoints ${job_id}) < ${expected_num_checkpoints})); do
+
+ if [[ ${count} -gt ${timeout} ]]; then
+ echo "A timeout occurred waiting for successful checkpoints"
+ exit 1
+ else
+ ((count+=2))
+ fi
+
+ local current_num_checkpoints=$(get_completed_number_of_checkpoints ${job_id})
+ echo "${current_num_checkpoints}/${expected_num_checkpoints} completed checkpoints"
+ sleep 2
+ done
+}
+
+function get_completed_number_of_checkpoints {
+ local job_id=$1
+ local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+
+ echo ${json_res} | # {"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} ...
+ cut -d ":" -f 6 | # 24,"failed"
+ sed 's/,.*//' # 24
+}
+
+function test_cleanup {
+ unlink_queryable_state_lib
+
+ # this is needed b.c. otherwise we might have exceptions from when
+ # we kill the task manager left behind in the logs, which would cause
+ # our test to fail in the cleanup function
+ clean_log_files
+ clean_stdout_files
+}
+
+trap test_cleanup EXIT
+run_test