You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/06/21 11:43:39 UTC

flink git commit: [FLINK-8982][e2e tests] Add E2E Tests for queryable state.

Repository: flink
Updated Branches:
  refs/heads/master 0bdde8377 -> 59a58e56c


[FLINK-8982][e2e tests] Add E2E Tests for queryable state.

Adds one test for normal execution and
one with a TM failure scenario.

This closes #5807.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59a58e56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59a58e56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59a58e56

Branch: refs/heads/master
Commit: 59a58e56cc2f5f1015244ef835a014e68675234f
Parents: 0bdde83
Author: Florian Schmidt <fl...@icloud.com>
Authored: Tue Mar 13 14:13:08 2018 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jun 21 13:42:29 2018 +0200

----------------------------------------------------------------------
 .../flink-queryable-state-test/pom.xml          | 134 ++++++++++++++
 .../streaming/tests/queryablestate/Email.java   |  77 ++++++++
 .../streaming/tests/queryablestate/EmailId.java |  73 ++++++++
 .../tests/queryablestate/EmailInformation.java  | 114 ++++++++++++
 .../tests/queryablestate/LabelSurrogate.java    |  65 +++++++
 .../tests/queryablestate/QsConstants.java       |  29 +++
 .../tests/queryablestate/QsStateClient.java     | 123 +++++++++++++
 .../tests/queryablestate/QsStateProducer.java   | 178 +++++++++++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   3 +
 flink-end-to-end-tests/test-scripts/common.sh   |  59 +++++-
 .../test-scripts/queryable_state_base.sh        |  56 ++++++
 .../test-scripts/test_queryable_state.sh        |  58 ++++++
 .../test_queryable_state_restart_tm.sh          | 174 ++++++++++++++++++
 14 files changed, 1143 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/pom.xml b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
new file mode 100644
index 0000000..a32deea
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+    -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-queryable-state-test_${scala.binary.version}</artifactId>
+	<name>flink-queryable-state-test</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+
+				<executions>
+					<execution>
+						<id>QsStateProducer</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>QsStateProducer</classifier>
+							<archive>
+								<manifestEntries>
+									<program-class>
+										org.apache.flink.streaming.tests.queryablestate.QsStateProducer
+									</program-class>
+								</manifestEntries>
+							</archive>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+
+					<execution>
+						<id>QsStateClient</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadeTestJar>false</shadeTestJar>
+							<shadedArtifactAttached>false</shadedArtifactAttached>
+							<createDependencyReducedPom>false</createDependencyReducedPom>
+							<transformers>
+								<transformer
+									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>
+										org.apache.flink.streaming.tests.queryablestate.QsStateClient
+									</mainClass>
+								</transformer>
+							</transformers>
+							<finalName>QsStateClient</finalName>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JARs for referring to them in the end-to-end test scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy
+									file="${project.basedir}/target/flink-queryable-state-test_${scala.binary.version}-${project.version}-QsStateProducer.jar"
+									tofile="${project.basedir}/target/QsStateProducer.jar"/>
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
new file mode 100644
index 0000000..c98ac6d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/Email.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Toy email resentation.
+ */
+public class Email {
+
+	private EmailId emailId;
+	private Instant timestamp;
+	private String foo;
+	private LabelSurrogate label;
+
+	public Email(EmailId emailId, Instant timestamp, String foo, LabelSurrogate label) {
+		this.emailId = emailId;
+		this.timestamp = timestamp;
+		this.foo = foo;
+		this.label = label;
+	}
+
+	public EmailId getEmailId() {
+		return emailId;
+	}
+
+	public void setEmailId(EmailId emailId) {
+		this.emailId = emailId;
+	}
+
+	public Instant getTimestamp() {
+		return timestamp;
+	}
+
+	public void setTimestamp(Instant timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	public String getFoo() {
+		return foo;
+	}
+
+	public void setFoo(String foo) {
+		this.foo = foo;
+	}
+
+	public LabelSurrogate getLabel() {
+		return label;
+	}
+
+	public void setLabel(LabelSurrogate label) {
+		this.label = label;
+	}
+
+	public String getDate() {
+		DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("UTC"));
+		return formatter.format(timestamp);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
new file mode 100644
index 0000000..63c6276
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * POJO representing an EmailId.
+ */
+public class EmailId implements Serializable {
+
+	private static final long serialVersionUID = -5001464312464872467L;
+
+	private String emailId;
+
+	public EmailId() {
+
+	}
+
+	public EmailId(String emailId) {
+		this.emailId = Objects.requireNonNull(emailId);
+	}
+
+	public void setEmailId(String emailId) {
+		this.emailId = emailId;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		EmailId emailId1 = (EmailId) o;
+
+		return Objects.equals(emailId, emailId1.emailId);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(emailId);
+	}
+
+	public String getEmailId() {
+		return emailId;
+	}
+
+	@Override
+	public String toString() {
+		return "EmailId{" +
+				"emailId='" + emailId + '\'' +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
new file mode 100644
index 0000000..b1d100a
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/EmailInformation.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * POJO representing some information about an email.
+ */
+public class EmailInformation implements Serializable {
+
+	private static final long serialVersionUID = -8956979869800484909L;
+
+	public void setEmailId(EmailId emailId) {
+		this.emailId = emailId;
+	}
+
+	private EmailId emailId;
+
+	public void setStuff(List<String> stuff) {
+		this.stuff = stuff;
+	}
+
+	private List<String> stuff;
+
+	public void setAsdf(Long asdf) {
+		this.asdf = asdf;
+	}
+
+	private Long asdf = 0L;
+
+	private transient LabelSurrogate label;
+
+	public EmailInformation() {
+
+	}
+
+	public EmailInformation(Email email) {
+		emailId = email.getEmailId();
+		stuff = new ArrayList<>();
+		stuff.add("1");
+		stuff.add("2");
+		stuff.add("3");
+		label = email.getLabel();
+	}
+
+	public EmailId getEmailId() {
+		return emailId;
+	}
+
+	public List<String> getStuff() {
+		return stuff;
+	}
+
+	public Long getAsdf() {
+		return asdf;
+	}
+
+	public LabelSurrogate getLabel() {
+		return label;
+	}
+
+	public void setLabel(LabelSurrogate label) {
+		this.label = label;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		EmailInformation that = (EmailInformation) o;
+		return Objects.equals(emailId, that.emailId) &&
+				Objects.equals(stuff, that.stuff) &&
+				Objects.equals(asdf, that.asdf) &&
+				Objects.equals(label, that.label);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(emailId, stuff, asdf, label);
+	}
+
+	@Override
+	public String toString() {
+		return "EmailInformation{" +
+				"emailId=" + emailId +
+				", stuff=" + stuff +
+				", asdf=" + asdf +
+				", label=" + label +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
new file mode 100644
index 0000000..0977ef0
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/LabelSurrogate.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A label surrogate.
+ */
+public class LabelSurrogate {
+
+	private Type type;
+	private String foo;
+
+	public LabelSurrogate(Type type, String foo) {
+		this.type = type;
+		this.foo = foo;
+	}
+
+	public Type getType() {
+		return type;
+	}
+
+	public void setType(Type type) {
+		this.type = type;
+	}
+
+	public String getFoo() {
+		return foo;
+	}
+
+	public void setFoo(String foo) {
+		this.foo = foo;
+	}
+
+	@Override
+	public String toString() {
+		return "LabelSurrogate{" +
+				"type=" + type +
+				", foo='" + foo + '\'' +
+				'}';
+	}
+
+	/**
+	 * An exemplary enum.
+	 */
+	public enum Type {
+		FOO,
+		BAR,
+		BAZ
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
new file mode 100644
index 0000000..7b26226
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+/**
+ * A class containing the constants used in the end-to-end test.
+ */
+public class QsConstants {
+
+	public static final String QUERY_NAME = "state";
+	public static final String STATE_NAME = "state";
+
+	public static final String KEY = "";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
new file mode 100644
index 0000000..b0b8ced
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A simple implementation of a queryable state client.
+ * This client queries the state for a while (~2.5 mins) and prints
+ * out the values that it found in the map state
+ *
+ * <p>Usage: java -jar QsStateClient.jar --host HOST --port PORT --job-id JOB_ID
+ */
+public class QsStateClient {
+
+	private static final int BOOTSTRAP_RETRIES = 240;
+
+	public static void main(final String[] args) throws Exception {
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		// setup values
+		String jobId = parameters.getRequired("job-id");
+		String host = parameters.get("host", "localhost");
+		int port = parameters.getInt("port", 9069);
+		int numIterations = parameters.getInt("iterations", 1500);
+
+		QueryableStateClient client = new QueryableStateClient(host, port);
+		client.setExecutionConfig(new ExecutionConfig());
+
+		MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
+				new MapStateDescriptor<>(
+						QsConstants.STATE_NAME,
+						TypeInformation.of(new TypeHint<EmailId>() {
+
+						}),
+						TypeInformation.of(new TypeHint<EmailInformation>() {
+
+						})
+				);
+
+		// wait for state to exist
+		for (int i = 0; i < BOOTSTRAP_RETRIES; i++) { // ~120s
+			try {
+				getMapState(jobId, client, stateDescriptor);
+				break;
+			} catch (ExecutionException e) {
+				if (e.getCause() instanceof UnknownKeyOrNamespaceException) {
+					System.err.println("State does not exist yet; sleeping 500ms");
+					Thread.sleep(500L);
+				} else {
+					throw e;
+				}
+			}
+
+			if (i == (BOOTSTRAP_RETRIES - 1)) {
+				throw new RuntimeException("Timeout: state doesn't exist after 120s");
+			}
+		}
+
+		// query state
+		for (int iterations = 0; iterations < numIterations; iterations++) {
+
+			MapState<EmailId, EmailInformation> mapState =
+				getMapState(jobId, client, stateDescriptor);
+
+			int counter = 0;
+			for (Map.Entry<EmailId, EmailInformation> entry: mapState.entries()) {
+				// this is to force deserialization
+				entry.getKey();
+				entry.getValue();
+				counter++;
+			}
+			System.out.println("MapState has " + counter + " entries"); // we look for it in the test
+
+			Thread.sleep(100L);
+		}
+	}
+
+	private static MapState<EmailId, EmailInformation> getMapState(
+			String jobId,
+			QueryableStateClient client,
+			MapStateDescriptor<EmailId, EmailInformation> stateDescriptor) throws InterruptedException, ExecutionException {
+
+		CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
+				client.getKvState(
+						JobID.fromHexString(jobId),
+						QsConstants.QUERY_NAME,
+						QsConstants.KEY, // which key of the keyed state to access
+						BasicTypeInfo.STRING_TYPE_INFO,
+						stateDescriptor);
+
+		return resultFuture.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
new file mode 100644
index 0000000..14eaac3
--- /dev/null
+++ b/flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.queryablestate;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Streaming application that creates an {@link Email} pojo with random ids and increasing
+ * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction},
+ * where it is exposed as queryable state.
+ */
+public class QsStateProducer {
+
+	public static void main(final String[] args) throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		ParameterTool tool = ParameterTool.fromArgs(args);
+		String tmpPath = tool.getRequired("tmp-dir");
+		String stateBackendType = tool.getRequired("state-backend");
+
+		StateBackend stateBackend;
+		switch (stateBackendType) {
+			case "rocksdb":
+				stateBackend = new RocksDBStateBackend(tmpPath);
+				break;
+			case "fs":
+				stateBackend = new FsStateBackend(tmpPath);
+				break;
+			case "memory":
+				stateBackend = new MemoryStateBackend();
+				break;
+			default:
+				throw new RuntimeException("Unsupported state backend " + stateBackendType);
+		}
+
+		env.setStateBackend(stateBackend);
+		env.enableCheckpointing(1000L);
+		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
+
+		env.addSource(new EmailSource())
+			.keyBy(new KeySelector<Email, String>() {
+
+				private static final long serialVersionUID = -1480525724620425363L;
+
+				@Override
+				public String getKey(Email value) throws Exception {
+					return QsConstants.KEY;
+				}
+			})
+			.flatMap(new TestFlatMap());
+
+		env.execute();
+	}
+
+	private static class EmailSource extends RichSourceFunction<Email> {
+
+		private static final long serialVersionUID = -7286937645300388040L;
+
+		private transient volatile boolean isRunning;
+
+		private transient Random random;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			this.random = new Random();
+			this.isRunning = true;
+		}
+
+		@Override
+		public void run(SourceContext<Email> ctx) throws Exception {
+			// Sleep for 10 seconds on start to allow time to copy jobid
+			Thread.sleep(10000L);
+
+			int types = LabelSurrogate.Type.values().length;
+
+			while (isRunning) {
+				int r = random.nextInt(100);
+
+				final EmailId emailId = new EmailId(Integer.toString(random.nextInt()));
+				final Instant timestamp = Instant.now().minus(Duration.ofDays(1L));
+				final String foo = String.format("foo #%d", r);
+				final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");
+
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(new Email(emailId, timestamp, foo, label));
+				}
+
+				Thread.sleep(30L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	private static class TestFlatMap extends RichFlatMapFunction<Email, Object> implements CheckpointedFunction {
+
+		private static final long serialVersionUID = 7821128115999005941L;
+
+		private transient MapState<EmailId, EmailInformation> state;
+		private transient int count;
+
+		@Override
+		public void open(Configuration parameters) {
+			MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
+					new MapStateDescriptor<>(
+							QsConstants.STATE_NAME,
+							TypeInformation.of(new TypeHint<EmailId>() {
+
+							}),
+							TypeInformation.of(new TypeHint<EmailInformation>() {
+
+							})
+					);
+			stateDescriptor.setQueryable(QsConstants.QUERY_NAME);
+			state = getRuntimeContext().getMapState(stateDescriptor);
+			count = -1;
+		}
+
+		@Override
+		public void flatMap(Email value, Collector<Object> out) throws Exception {
+			state.put(value.getEmailId(), new EmailInformation(value));
+			count = Iterables.size(state.keys());
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) {
+			System.out.println("Count on snapshot: " + count); // we look for it in the test
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8fb7eb8..c169050 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -43,6 +43,7 @@ under the License.
 		<module>flink-distributed-cache-via-blob-test</module>
 		<module>flink-high-parallelism-iterations-test</module>
 		<module>flink-stream-stateful-job-upgrade-test</module>
+		<module>flink-queryable-state-test</module>
 		<module>flink-local-recovery-and-allocation-test</module>
 		<module>flink-elasticsearch1-test</module>
 		<module>flink-elasticsearch2-test</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index cf70558..c4c5069 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -43,6 +43,9 @@ echo "Flink distribution directory: $FLINK_DIR"
 
 # run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
 
+run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
+run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
+
 run_test "Running HA (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
 run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
 run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 3498b56..610be2d 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -113,6 +113,9 @@ function create_ha_config() {
     #==============================================================================
 
     rest.port: 8081
+
+    query.server.ports: 9000-9009
+    query.proxy.ports: 9010-9019
 EOL
 }
 
@@ -137,7 +140,7 @@ function start_local_zk {
             address=${BASH_REMATCH[2]}
 
             if [ "${address}" != "localhost" ]; then
-                echo "[ERROR] Parse error. Only available for localhost."
+                echo "[ERROR] Parse error. Only available for localhost. Expected address 'localhost' but got '${address}'"
                 exit 1
             fi
             ${FLINK_DIR}/bin/zookeeper.sh start $id
@@ -167,6 +170,32 @@ function start_cluster {
   done
 }
 
+function start_and_wait_for_tm {
+
+  tm_query_result=$(curl -s "http://localhost:8081/taskmanagers")
+
+  # we assume that the cluster is running
+  if ! [[ ${tm_query_result} =~ \{\"taskmanagers\":\[.*\]\} ]]; then
+    echo "Your cluster seems to be unresponsive at the moment: ${tm_query_result}" 1>&2
+    exit 1
+  fi
+
+  running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l`
+
+  ${FLINK_DIR}/bin/taskmanager.sh start
+
+  for i in {1..10}; do
+    local new_running_tms=`curl -s "http://localhost:8081/taskmanagers" | grep -o "id" | wc -l`
+    if [ $((new_running_tms-running_tms)) -eq 0 ]; then
+      echo "TaskManager is not yet up."
+    else
+      echo "TaskManager is up."
+      break
+    fi
+    sleep 4
+  done
+}
+
 function check_logs_for_errors {
   if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
       | grep -v "RetriableCommitFailedException" \
@@ -457,3 +486,31 @@ function end_timer {
     duration=$SECONDS
     echo "$(($duration / 60)) minutes and $(($duration % 60)) seconds"
 }
+
+function clean_stdout_files {
+    rm ${FLINK_DIR}/log/*.out
+}
+
+function clean_log_files {
+    rm ${FLINK_DIR}/log/*
+}
+
+# Expect a string to appear in the log files of the task manager before a given timeout
+# $1: expected string
+# $2: timeout in seconds
+function expect_in_taskmanager_logs {
+    local expected="$1"
+    local timeout=$2
+    local i=0
+    local logfile="${FLINK_DIR}/log/flink*taskexecutor*log"
+
+
+    while ! grep "${expected}" ${logfile} > /dev/null; do
+        sleep 1s
+        ((i++))
+        if ((i > timeout)); then
+            echo "A timeout occurred waiting for '${expected}' to appear in the taskmanager logs"
+            exit 1
+        fi
+    done
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/queryable_state_base.sh b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
new file mode 100644
index 0000000..65ddef4
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/queryable_state_base.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+function link_queryable_state_lib {
+    echo "Moving flink-queryable-state-runtime from opt/ to lib/"
+    mv ${FLINK_DIR}/opt/flink-queryable-state-runtime* ${FLINK_DIR}/lib/
+    if [ $? != 0 ]; then
+        echo "Failed to move flink-queryable-state-runtime from opt/ to lib/. Exiting"
+        exit 1
+    fi
+}
+
+function unlink_queryable_state_lib {
+    echo "Moving flink-queryable-state-runtime from lib/ to opt/"
+    mv ${FLINK_DIR}/lib/flink-queryable-state-runtime* ${FLINK_DIR}/opt/
+    if [ $? != 0 ]; then
+        echo "Failed to move flink-queryable-state-runtime from lib/ to opt/. Exiting"
+        exit 1
+    fi
+}
+
+# Returns the ip address of the queryable state server
+function get_queryable_state_server_ip {
+    local ip=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
+        | grep "Started Queryable State Server" \
+        | head -1 \
+        | awk '{split($11, a, "/"); split(a[2], b, ":"); print b[1]}')
+
+    printf "${ip} \n"
+}
+
+# Returns the ip address of the queryable state server
+function get_queryable_state_proxy_port {
+    local port=$(cat ${FLINK_DIR}/log/flink*taskexecutor*log \
+        | grep "Started Queryable State Proxy Server" \
+        | head -1 \
+        | awk '{split($12, a, "/"); split(a[2], b, ":"); split(b[2], c, "."); print c[1]}')
+
+    printf "${port} \n"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
new file mode 100755
index 0000000..8d74fac
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_queryable_state.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+
+function run_test {
+    link_queryable_state_lib
+    start_cluster
+
+    QUERYABLE_STATE_PRODUCER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar
+    QUERYABLE_STATE_CONSUMER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+    # start app with queryable state and wait for it to be available
+    JOB_ID=$(${FLINK_DIR}/bin/flink run \
+        -p 1 \
+        -d ${QUERYABLE_STATE_PRODUCER_JAR} \
+        --state-backend $1 \
+        --tmp-dir file://${TEST_DATA_DIR} \
+        | awk '{print $NF}' | tail -n 1)
+
+    wait_job_running ${JOB_ID}
+
+    # run the client and query state the first time
+    first_result=$(java -jar ${QUERYABLE_STATE_CONSUMER_JAR} \
+        --host $(get_queryable_state_server_ip) \
+        --port $(get_queryable_state_proxy_port) \
+        --job-id ${JOB_ID})
+
+    EXIT_CODE=$?
+
+    # Exit
+    exit ${EXIT_CODE}
+}
+
+function test_cleanup {
+    unlink_queryable_state_lib
+    clean_stdout_files
+}
+
+trap test_cleanup EXIT
+run_test $1

http://git-wip-us.apache.org/repos/asf/flink/blob/59a58e56/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
new file mode 100755
index 0000000..06199ea
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_queryable_state_restart_tm.sh
@@ -0,0 +1,174 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/queryable_state_base.sh
+
+QUERYABLE_STATE_SERVER_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateProducer.jar
+QUERYABLE_STATE_CLIENT_JAR=${TEST_INFRA_DIR}/../../flink-end-to-end-tests/flink-queryable-state-test/target/QsStateClient.jar
+
+#####################
+# Test that queryable state works as expected with HA mode when restarting a taskmanager
+#
+# The general outline is like this:
+# 1. start cluster in HA mode with 1 TM
+# 2. start a job that exposes queryable state from a mapstate with increasing num. of keys
+# 3. query the state with a queryable state client and expect no error to occur
+# 4. stop the TM
+# 5. check how many keys were in our mapstate at the time of the latest snapshot
+# 6. start a new TM
+# 7. query the state with a queryable state client and retrieve the number of elements
+#    in the mapstate
+# 8. expect the number of elements in the mapstate after restart of TM to be > number of elements
+#    at last snapshot
+#
+# Globals:
+#   QUERYABLE_STATE_SERVER_JAR
+#   QUERYABLE_STATE_CLIENT_JAR
+# Arguments:
+#   None
+# Returns:
+#   None
+#####################
+function run_test() {
+    local EXIT_CODE=0
+    local PARALLELISM=1 # parallelism of queryable state app
+    local PORT="9069" # port of queryable state server
+
+    # to ensure there are no files accidentally left behind by previous tests
+    clean_log_files
+    clean_stdout_files
+
+    link_queryable_state_lib
+    start_cluster
+
+    local JOB_ID=$(${FLINK_DIR}/bin/flink run \
+        -p ${PARALLELISM} \
+        -d ${QUERYABLE_STATE_SERVER_JAR} \
+        --state-backend "rocksdb" \
+        --tmp-dir file://${TEST_DATA_DIR} \
+        | awk '{print $NF}' | tail -n 1)
+
+    wait_job_running ${JOB_ID}
+    wait_for_number_of_checkpoints ${JOB_ID} 10 60
+
+    SERVER=$(get_queryable_state_server_ip)
+    PORT=$(get_queryable_state_proxy_port)
+
+    echo SERVER: ${SERVER}
+    echo PORT: ${PORT}
+
+    java -jar ${QUERYABLE_STATE_CLIENT_JAR} \
+        --host ${SERVER} \
+        --port ${PORT} \
+        --iterations 1 \
+        --job-id ${JOB_ID}
+
+    if [ $? != 0 ]; then
+        echo "An error occurred when executing queryable state client"
+        exit 1
+    fi
+
+    local current_num_checkpoints=current_num_checkpoints$(get_completed_number_of_checkpoints ${JOB_ID})
+
+    kill_random_taskmanager
+
+    latest_snapshot_count=$(cat $FLINK_DIR/log/*out* | grep "on snapshot" | tail -n 1 | awk '{print $4}')
+    echo "Latest snapshot count was ${latest_snapshot_count}"
+
+    sleep 65 # this is a little longer than the heartbeat timeout so that the TM is gone
+
+    start_and_wait_for_tm
+
+    # wait for some more checkpoint to have happened
+    ((current_num_checkpoints+=2))
+    wait_for_number_of_checkpoints ${JOB_ID} ${current_num_checkpoints} 60
+
+    local num_entries_in_map_state_after=$(java -jar ${QUERYABLE_STATE_CLIENT_JAR} \
+        --host ${SERVER} \
+        --port ${PORT} \
+        --iterations 1 \
+        --job-id ${JOB_ID} | grep "MapState has" | awk '{print $3}')
+
+    echo "after: $num_entries_in_map_state_after"
+
+    if ((latest_snapshot_count > num_entries_in_map_state_after)); then
+        echo "An error occurred"
+        EXIT_CODE=1
+    fi
+
+    exit ${EXIT_CODE}
+}
+
+###################################
+# Wait a specific number of successful checkpoints
+# to have happened
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the job id
+#   $2: the number of expected successful checkpoints
+#   $3: timeout in seconds
+# Returns:
+#   None
+###################################
+function wait_for_number_of_checkpoints {
+    local job_id=$1
+    local expected_num_checkpoints=$2
+    local timeout=$3
+    local count=0
+
+    echo "Starting to wait for checkpoints"
+    while (($(get_completed_number_of_checkpoints ${job_id}) < ${expected_num_checkpoints})); do
+
+        if [[ ${count} -gt ${timeout} ]]; then
+            echo "A timeout occurred waiting for successful checkpoints"
+            exit 1
+        else
+            ((count+=2))
+        fi
+
+        local current_num_checkpoints=$(get_completed_number_of_checkpoints ${job_id})
+        echo "${current_num_checkpoints}/${expected_num_checkpoints} completed checkpoints"
+        sleep 2
+    done
+}
+
+function get_completed_number_of_checkpoints {
+    local job_id=$1
+    local json_res=$(curl -s http://localhost:8081/jobs/${job_id}/checkpoints)
+
+    echo ${json_res}    | # {"counts":{"restored":0,"total":25,"in_progress":1,"completed":24,"failed":0} ...
+        cut -d ":" -f 6 | # 24,"failed"
+        sed 's/,.*//'     # 24
+}
+
+function test_cleanup {
+    unlink_queryable_state_lib
+
+    # this is needed b.c. otherwise we might have exceptions from when
+    # we kill the task manager left behind in the logs, which would cause
+    # our test to fail in the cleanup function
+    clean_log_files
+    clean_stdout_files
+}
+
+trap test_cleanup EXIT
+run_test