You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/11 10:19:00 UTC

[GitHub] klion26 closed pull request #6430: [FLINK-8058][Queryable State]Queryable state should check types

klion26 closed pull request #6430: [FLINK-8058][Queryable State]Queryable state should check types
URL: https://github.com/apache/flink/pull/6430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 470c7acf9d7..70f2c31642c 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -263,14 +263,16 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
 		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 
 		final byte[] serializedKeyAndNamespace;
+		final byte[] serializedStateDescriptor;
 		try {
 			serializedKeyAndNamespace = KvStateSerializer
 					.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
+			serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(stateDescriptor);
 		} catch (IOException e) {
 			return FutureUtils.getFailedFuture(e);
 		}
 
-		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace, serializedStateDescriptor)
 			.thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
 	}
 
@@ -306,10 +308,12 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
 			final JobID jobId,
 			final String queryableStateName,
 			final int keyHashCode,
-			final byte[] serializedKeyAndNamespace) {
+			final byte[] serializedKeyAndNamespace,
+			final byte[] serializedStateDescriptor) {
 		LOG.debug("Sending State Request to {}.", remoteAddress);
 		try {
-			KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
+			KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode,
+											serializedKeyAndNamespace, serializedStateDescriptor);
 			return client.sendRequest(remoteAddress, request);
 		} catch (Exception e) {
 			LOG.error("Unable to send KVStateRequest: ", e);
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
index 4a64678e550..0caf68b28a7 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.queryablestate.client.state.serialization;
 
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -264,4 +269,39 @@
 			return null;
 		}
 	}
+
+	/**
+	 * Serialize a stateDescriptor to bytes[].
+	 * @param stateDescriptor the value will be serialized.
+	 *
+	 * @return The serialized values
+	 * @throws IOException On failure during serialization
+	 */
+	public static byte[] serializedStateDescriptor(StateDescriptor<?, ?> stateDescriptor) throws IOException {
+		ByteArrayOutputStream bos = new ByteArrayOutputStream();
+		try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+			out.writeObject(stateDescriptor);
+		}
+
+		byte[] buf = bos.toByteArray();
+		return buf;
+	}
+
+	/**
+	 * Deserialized StateDescriptor from byte[].
+	 * @param serializedValue the serializedValue
+	 *
+	 * @return The deserialized stateDescriptor
+	 * @throws IOException On failure during deserialization
+	 * @throws ClassNotFoundException on failure during deserialization
+	 */
+	public static StateDescriptor<?, ?> deserializeStateDescriptor(byte[] serializedValue) throws IOException, ClassNotFoundException {
+		if (serializedValue == null) {
+			return null;
+		}
+
+		ByteArrayInputStream bis = new ByteArrayInputStream(serializedValue);
+		ObjectInputStream in = new ObjectInputStream(bis);
+		return (StateDescriptor<?, ?>) in.readObject();
+	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
index 2e2ea6a3fcb..11144e1bf28 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
@@ -41,17 +41,20 @@
 	private final String stateName;
 	private final int keyHashCode;
 	private final byte[] serializedKeyAndNamespace;
+	private final byte[] serializedStateDescriptor;
 
 	public KvStateRequest(
 			final JobID jobId,
 			final String stateName,
 			final int keyHashCode,
-			final byte[] serializedKeyAndNamespace) {
+			final byte[] serializedKeyAndNamespace,
+			final byte[] serializedStateDescriptor) {
 
 		this.jobId = Preconditions.checkNotNull(jobId);
 		this.stateName = Preconditions.checkNotNull(stateName);
 		this.keyHashCode = keyHashCode;
 		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+		this.serializedStateDescriptor = serializedStateDescriptor;
 	}
 
 	public JobID getJobId() {
@@ -70,17 +73,23 @@ public int getKeyHashCode() {
 		return serializedKeyAndNamespace;
 	}
 
+	public byte[] getSerializedStateDescriptor() {
+		return serializedStateDescriptor;
+	}
+
 	@Override
 	public byte[] serialize() {
 
 		byte[] serializedStateName = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
 		// JobID + stateName + sizeOf(stateName) + hashCode + keyAndNamespace + sizeOf(keyAndNamespace)
+		// + stateDescriptor + sizeOf(stateDescriptor)
 		final int size =
 				JobID.SIZE +
 				serializedStateName.length + Integer.BYTES +
 				Integer.BYTES +
-				serializedKeyAndNamespace.length + Integer.BYTES;
+				serializedKeyAndNamespace.length + Integer.BYTES +
+				serializedStateDescriptor.length + Integer.BYTES;
 
 		return ByteBuffer.allocate(size)
 				.putLong(jobId.getLowerPart())
@@ -90,6 +99,8 @@ public int getKeyHashCode() {
 				.putInt(keyHashCode)
 				.putInt(serializedKeyAndNamespace.length)
 				.put(serializedKeyAndNamespace)
+				.putInt(serializedStateDescriptor.length)
+				.put(serializedStateDescriptor)
 				.array();
 	}
 
@@ -135,7 +146,17 @@ public KvStateRequest deserializeMessage(ByteBuf buf) {
 			if (knamespaceLength > 0) {
 				buf.readBytes(serializedKeyAndNamespace);
 			}
-			return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace);
+
+			int descriptorLength = buf.readInt();
+			Preconditions.checkArgument(descriptorLength >= 0,
+				"Negative length for stateDescriptor. " +
+				"This indicates a serialization error.");
+
+			byte[] serializedStateDescriptor = new byte[descriptorLength];
+			if (descriptorLength > 0) {
+				buf.readBytes(serializedStateDescriptor);
+			}
+			return new KvStateRequest(jobId, stateName, keyHashCode, serializedKeyAndNamespace, serializedStateDescriptor);
 		}
 	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 1586566bb17..4dbfcd7244b 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -171,7 +171,7 @@ private void executeActionAsync(
 						// Query server
 						final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
 						final KvStateInternalRequest internalRequest = new KvStateInternalRequest(
-								kvStateId, request.getSerializedKeyAndNamespace());
+								kvStateId, request.getSerializedKeyAndNamespace(), request.getSerializedStateDescriptor());
 						return kvStateClient.sendRequest(serverAddress, internalRequest);
 					}
 				}, queryExecutor);
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
index 8c8de59a6e6..28ade944019 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -38,13 +38,16 @@
 
 	private final KvStateID kvStateId;
 	private final byte[] serializedKeyAndNamespace;
+	private final byte[] serializedStateDescriptor;
 
 	public KvStateInternalRequest(
 			final KvStateID stateId,
-			final byte[] serializedKeyAndNamespace) {
+			final byte[] serializedKeyAndNamespace,
+			final byte[] serializedStateDescriptor) {
 
 		this.kvStateId = Preconditions.checkNotNull(stateId);
 		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+		this.serializedStateDescriptor = Preconditions.checkNotNull(serializedStateDescriptor);
 	}
 
 	public KvStateID getKvStateId() {
@@ -55,17 +58,25 @@ public KvStateID getKvStateId() {
 		return serializedKeyAndNamespace;
 	}
 
+	public byte[] getSerializedStateDescriptor() {
+		return serializedStateDescriptor;
+	}
+
 	@Override
 	public byte[] serialize() {
 
 		// KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace
-		final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length;
+		// + sizeOf(serializedStateDescriptor) + serializedStateDescriptor
+		final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length
+						+ Integer.BYTES + serializedStateDescriptor.length;
 
 		return ByteBuffer.allocate(size)
 				.putLong(kvStateId.getLowerPart())
 				.putLong(kvStateId.getUpperPart())
 				.putInt(serializedKeyAndNamespace.length)
 				.put(serializedKeyAndNamespace)
+				.putInt(serializedStateDescriptor.length)
+				.put(serializedStateDescriptor)
 				.array();
 	}
 
@@ -87,7 +98,17 @@ public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
 			if (length > 0) {
 				buf.readBytes(serializedKeyAndNamespace);
 			}
-			return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+
+			int descriptorLength = buf.readInt();
+			Preconditions.checkArgument(descriptorLength >= 0,
+							"Negative length for key and namespace. " +
+							"This indicates a serialization error.");
+			byte[] serializedStateDescriptor = new byte[descriptorLength];
+			if (descriptorLength > 0) {
+				buf.readBytes(serializedStateDescriptor);
+			}
+
+			return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 		}
 	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index d46deffeb58..b6de5dd844a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.queryablestate.server;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
 import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
@@ -81,6 +83,14 @@ public KvStateServerHandler(
 			} else {
 				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
 
+				StateDescriptor<?, ?> requestStateDescriptor = KvStateSerializer.deserializeStateDescriptor(request.getSerializedStateDescriptor());
+				StateDescriptor<?, ?> registStateDescriptor = kvState.getStateDescriptor();
+
+				Preconditions.checkArgument(requestStateDescriptor.getType().equals(registStateDescriptor.getType()),
+					"State type mismatch, need[%s] gotten[%s]", registStateDescriptor.getType(), requestStateDescriptor.getType());
+				Preconditions.checkArgument(requestStateDescriptor.getSerializer().equals(registStateDescriptor.getSerializer()),
+					"State value serializer mismatch, need [%s] gotten[%s]" , registStateDescriptor.getSerializer(), requestStateDescriptor.getSerializer());
+
 				byte[] serializedResult = getSerializedValue(kvState, serializedKeyAndNamespace);
 				if (serializedResult != null) {
 					responseFuture.complete(new KvStateResponse(serializedResult));
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index bceb3615bde..abc112ddd9f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -158,7 +158,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
 
 			List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>();
 			for (long i = 0L; i < numQueries; i++) {
-				KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+				KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 				futures.add(client.sendRequest(serverAddress, request));
 			}
 
@@ -277,7 +277,7 @@ public void testRequestUnavailableHost() throws Exception {
 					InetAddress.getLocalHost(),
 					availablePort);
 
-			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 			CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request);
 
 			try {
@@ -356,7 +356,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
 				List<CompletableFuture<KvStateResponse>> results = new ArrayList<>(numQueriesPerTask);
 
 				for (int i = 0; i < numQueriesPerTask; i++) {
-					KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+					KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 					results.add(finalClient.sendRequest(serverAddress, request));
 				}
 
@@ -446,7 +446,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
 
 			// Requests
 			List<Future<KvStateResponse>> futures = new ArrayList<>();
-			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 
 			futures.add(client.sendRequest(serverAddress, request));
 			futures.add(client.sendRequest(serverAddress, request));
@@ -555,7 +555,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
 			InetSocketAddress serverAddress = getKvStateServerAddress(serverChannel);
 
 			// Requests
-			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+			KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 			Future<KvStateResponse> future = client.sendRequest(serverAddress, request);
 
 			while (!received.get() && deadline.hasTimeLeft()) {
@@ -689,7 +689,7 @@ public void testClientServerIntegration() throws Throwable {
 				InternalKvState<Integer, ?, Integer> kvState = (InternalKvState<Integer, ?, Integer>) state;
 
 				// Register KvState (one state instance for all server)
-				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState, desc);
 			}
 
 			final Client<KvStateInternalRequest, KvStateResponse> finalClient = client;
@@ -717,8 +717,10 @@ public void testClientServerIntegration() throws Throwable {
 								IntSerializer.INSTANCE,
 								VoidNamespace.INSTANCE,
 								VoidNamespaceSerializer.INSTANCE);
+						byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
-						KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+						KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer],
+																serializedKeyAndNamespace, serializedStateDescriptor);
 						futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), request));
 					}
 
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index adcf3aefb7c..c131be8b5f5 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -148,13 +149,14 @@ public void testSimpleQuery() throws Exception {
 				IntSerializer.INSTANCE,
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE);
+		byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
 		long requestId = Integer.MAX_VALUE + 182828L;
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
 		KvStateInternalRequest request = new KvStateInternalRequest(
-				registryListener.kvStateId, serializedKeyAndNamespace);
+				registryListener.kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
@@ -202,7 +204,7 @@ public void testQueryUnknownKvStateID() throws Exception {
 
 		long requestId = Integer.MAX_VALUE + 182828L;
 
-		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
@@ -259,12 +261,13 @@ public void testQueryUnknownKey() throws Exception {
 				IntSerializer.INSTANCE,
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE);
+		byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
 		long requestId = Integer.MAX_VALUE + 22982L;
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
@@ -300,51 +303,18 @@ public void testFailureOnGetSerializedValue() throws Exception {
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
 		// Failing KvState
-		InternalKvState<Integer, VoidNamespace, Long> kvState =
-				new InternalKvState<Integer, VoidNamespace, Long>() {
-					@Override
-					public TypeSerializer<Integer> getKeySerializer() {
-						return IntSerializer.INSTANCE;
-					}
-
-					@Override
-					public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
-						return VoidNamespaceSerializer.INSTANCE;
-					}
-
-					@Override
-					public TypeSerializer<Long> getValueSerializer() {
-						return LongSerializer.INSTANCE;
-					}
-
-					@Override
-					public void setCurrentNamespace(VoidNamespace namespace) {
-						// do nothing
-					}
-
-					@Override
-					public byte[] getSerializedValue(
-							final byte[] serializedKeyAndNamespace,
-							final TypeSerializer<Integer> safeKeySerializer,
-							final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
-							final TypeSerializer<Long> safeValueSerializer) throws Exception {
-						throw new RuntimeException("Expected test Exception");
-					}
-
-					@Override
-					public void clear() {
-
-					}
-				};
+		InternalKvState<Integer, VoidNamespace, Long> kvState = getFailingState();
+		ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor("test", LongSerializer.INSTANCE);
 
 		KvStateID kvStateId = registry.registerKvState(
 				new JobID(),
 				new JobVertexID(),
 				new KeyGroupRange(0, 0),
 				"vanilla",
-				kvState);
+				kvState,
+				descriptor);
 
-		KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0], KvStateSerializer.serializedStateDescriptor(descriptor));
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
 		// Write the request and wait for the response
@@ -363,6 +333,92 @@ public void clear() {
 		assertEquals(1L, stats.getNumFailed());
 	}
 
+	@Test
+	public void testFailureOnCheckStateType() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+			new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		// Failing KvState
+		InternalKvState<Integer, VoidNamespace, Long> kvState = getFailingState();
+		ValueStateDescriptor<Long> registDescriptor = new ValueStateDescriptor("regist", LongSerializer.INSTANCE);
+
+		KvStateID kvStateId = registry.registerKvState(
+			new JobID(),
+			new JobVertexID(),
+			new KeyGroupRange(0, 0),
+			"vanilla",
+			kvState,
+			registDescriptor);
+
+		ListStateDescriptor<Long> requestdescriptor = new ListStateDescriptor("test", LongSerializer.INSTANCE);
+		KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0], KvStateSerializer.serializedStateDescriptor(requestdescriptor));
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+		// Write the request and wait for the response
+		channel.writeInbound(serRequest);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+
+		assertTrue(response.getCause().getMessage().contains("State type mismatch"));
+
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
+	}
+
+	@Test
+	public void testFailureOnCheckStateValueType() throws Exception {
+		KvStateRegistry registry = new KvStateRegistry();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
+			new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
+
+		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
+		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+
+		// Failing KvState
+		InternalKvState<Integer, VoidNamespace, Long> kvState = getFailingState();
+
+		ValueStateDescriptor<Long> registDescriptor = new ValueStateDescriptor("test", LongSerializer.INSTANCE);
+		KvStateID kvStateId = registry.registerKvState(
+			new JobID(),
+			new JobVertexID(),
+			new KeyGroupRange(0, 0),
+			"vanilla",
+			kvState,
+			registDescriptor);
+
+		ValueStateDescriptor<Long> requestDescriptor = new ValueStateDescriptor("test", IntSerializer.INSTANCE);
+		KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0], KvStateSerializer.serializedStateDescriptor(requestDescriptor));
+		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+		// Write the request and wait for the response
+		channel.writeInbound(serRequest);
+
+		ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify the response
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		RequestFailure response = MessageSerializer.deserializeRequestFailure(buf);
+
+		assertTrue(response.getCause().getMessage().contains("State value serializer mismatch"));
+
+		assertEquals(1L, stats.getNumRequests());
+		assertEquals(1L, stats.getNumFailed());
+	}
+
 	/**
 	 * Tests that the channel is closed if an Exception reaches the channel handler.
 	 */
@@ -435,7 +491,7 @@ public void testQueryExecutorShutDown() throws Throwable {
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0], new byte[0]);
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
 		// Write the request and wait for the response
@@ -519,7 +575,7 @@ public void testIncomingBufferIsRecycled() throws Exception {
 		KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
-		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
+		KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
 		assertEquals(1L, serRequest.refCnt());
@@ -588,10 +644,12 @@ public void testSerializerMismatch() throws Exception {
 				IntSerializer.INSTANCE,
 				"wrong-namespace-type",
 				StringSerializer.INSTANCE);
+		byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId,
+																	wrongKeyAndNamespace, serializedStateDescriptor);
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
 
 		// Write the request and wait for the response
@@ -607,7 +665,7 @@ public void testSerializerMismatch() throws Exception {
 		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		// Repeat with wrong namespace only
-		request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+		request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace, serializedStateDescriptor);
 		serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
 
 		// Write the request and wait for the response
@@ -676,12 +734,13 @@ public void testChunkedResponse() throws Exception {
 				IntSerializer.INSTANCE,
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE);
+		byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
 		long requestId = Integer.MAX_VALUE + 182828L;
 
 		assertTrue(registryListener.registrationName.equals("vanilla"));
 
-		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+		KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 		ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
 		// Write the request and wait for the response
@@ -765,4 +824,44 @@ public void notifyKvStateUnregistered(JobID jobId,
 			registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()),
 			TtlTimeProvider.DEFAULT);
 	}
+
+	private InternalKvState<Integer, VoidNamespace, Long> getFailingState() {
+		InternalKvState<Integer, VoidNamespace, Long> kvState =
+			new InternalKvState<Integer, VoidNamespace, Long>() {
+				@Override
+				public TypeSerializer<Integer> getKeySerializer() {
+					return IntSerializer.INSTANCE;
+				}
+
+				@Override
+				public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
+					return VoidNamespaceSerializer.INSTANCE;
+				}
+
+				@Override
+				public TypeSerializer<Long> getValueSerializer() {
+					return LongSerializer.INSTANCE;
+				}
+
+				@Override
+				public void setCurrentNamespace(VoidNamespace namespace) {
+					// do nothing
+				}
+
+				@Override
+				public byte[] getSerializedValue(
+					final byte[] serializedKeyAndNamespace,
+					final TypeSerializer<Integer> safeKeySerializer,
+					final TypeSerializer<VoidNamespace> safeNamespaceSerializer,
+					final TypeSerializer<Long> safeValueSerializer) throws Exception {
+					throw new RuntimeException("Expected test Exception");
+				}
+
+				@Override
+				public void clear() {
+
+				}
+			};
+		return kvState;
+	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 79c23ad2a2d..754aaea2f17 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -145,6 +145,7 @@ public void testSimpleRequest() throws Throwable {
 					IntSerializer.INSTANCE,
 					VoidNamespace.INSTANCE,
 					VoidNamespaceSerializer.INSTANCE);
+			byte[] serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
 			// Connect to the server
 			final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>();
@@ -167,7 +168,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
 
 			final KvStateInternalRequest request = new KvStateInternalRequest(
 					registryListener.kvStateId,
-					serializedKeyAndNamespace);
+					serializedKeyAndNamespace,
+					serializedStateDescriptor);
 
 			ByteBuf serializeRequest = MessageSerializer.serializeRequest(
 					channel.alloc(),
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
index acaa0671bc0..a36612ff757 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -64,8 +64,9 @@ public void testRequestSerialization() throws Exception {
 		long requestId = Integer.MAX_VALUE + 1337L;
 		KvStateID kvStateId = new KvStateID();
 		byte[] serializedKeyAndNamespace = randomByteArray(1024);
+		byte[] serializedStateDescriptor = randomByteArray(1024);
 
-		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
 				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
 
@@ -91,8 +92,9 @@ public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
 		long requestId = Integer.MAX_VALUE + 1337L;
 		KvStateID kvStateId = new KvStateID();
 		byte[] serializedKeyAndNamespace = new byte[0];
+		byte[] serializedStateDescriptor = new byte[0];
 
-		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace, serializedStateDescriptor);
 		final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
 				new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
 
@@ -115,7 +117,16 @@ public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
 	 */
 	@Test(expected = NullPointerException.class)
 	public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
-		new KvStateInternalRequest(new KvStateID(), null);
+		new KvStateInternalRequest(new KvStateID(), null, new byte[0]);
+	}
+
+	/**
+	 * Tests that we don't try to be smart about <code>null</code> stateDescriptor.
+	 * They should be treated explicityly.
+	 */
+	@Test(expected = NullPointerException.class)
+	public void testNullPointerExceptionOnNullSerializedStateDescriptor() throws Exception {
+		new KvStateInternalRequest(new KvStateID(), new byte[0], null);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
index 0bd132f6e05..e9fdd6633ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
@@ -38,13 +39,15 @@
 
 	private final InternalKvState<K, N, V> state;
 	private final KvStateInfo<K, N, V> stateInfo;
+	private final StateDescriptor<?, ?> stateDescriptor;
 
 	private final boolean areSerializersStateless;
 
 	private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> serializerCache;
 
-	public KvStateEntry(final InternalKvState<K, N, V> state) {
+	public KvStateEntry(final InternalKvState<K, N, V> state, final StateDescriptor<?, ?> stateDescriptor) {
 		this.state = Preconditions.checkNotNull(state);
+		this.stateDescriptor = Preconditions.checkNotNull(stateDescriptor);
 		this.stateInfo = new KvStateInfo<>(
 				state.getKeySerializer(),
 				state.getNamespaceSerializer(),
@@ -64,6 +67,10 @@ public KvStateEntry(final InternalKvState<K, N, V> state) {
 				: serializerCache.computeIfAbsent(Thread.currentThread(), t -> stateInfo.duplicate());
 	}
 
+	public StateDescriptor getStateDescriptor() {
+		return this.stateDescriptor;
+	}
+
 	public void clear() {
 		serializerCache.clear();
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 430b06bd7d1..14165f41287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,15 +82,16 @@ public void unregisterListener(JobID jobId) {
 	 * @return Assigned KvStateID
 	 */
 	public KvStateID registerKvState(
-			JobID jobId,
-			JobVertexID jobVertexId,
-			KeyGroupRange keyGroupRange,
-			String registrationName,
-			InternalKvState<?, ?, ?> kvState) {
+		JobID jobId,
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName,
+		InternalKvState<?, ?, ?> kvState,
+		StateDescriptor<?, ?> stateDescriptor) {
 
 		KvStateID kvStateId = new KvStateID();
 
-		if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry<>(kvState)) == null) {
+		if (registeredKvStates.putIfAbsent(kvStateId, new KvStateEntry(kvState, stateDescriptor)) == null) {
 			final KvStateRegistryListener listener = getKvStateRegistryListener(jobId);
 
 			if (listener != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index a44a508ecbc..bfa6d724c0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -58,11 +59,12 @@
 	 * @param registrationName The registration name (not necessarily the same
 	 *                         as the KvState name defined in the state
 	 *                         descriptor used to create the KvState instance)
-	 * @param kvState          The
+	 * @param kvState          The internal kv instate
+	 * @param stateDescriptor  The descriptor of the state
 	 */
-	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?, ?, ?> kvState) {
-		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
-		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, InternalKvState<?, ?, ?> kvState, StateDescriptor<?, ?> stateDescriptor) {
+		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState, stateDescriptor);
+		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId, stateDescriptor));
 	}
 
 	/**
@@ -85,7 +87,7 @@ public void unregisterAll() {
 
 		private final KvStateID kvStateId;
 
-		KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+		KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, StateDescriptor stateDescriptor) {
 			this.keyGroupRange = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 17d24f77472..a8dc3064bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -251,7 +251,7 @@ private void publishQueryableStateIfEnabled(
 				throw new IllegalStateException("State backend has not been initialized for job.");
 			}
 			String name = stateDescriptor.getQueryableStateName();
-			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
+			kvStateRegistry.registerKvState(keyGroupRange, name, kvState, stateDescriptor);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index c1c56bf250b..deef7fe49be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.queryablestate.KvStateID;
@@ -77,7 +79,8 @@ public void testKvStateEntry() throws InterruptedException {
 				jobVertexId,
 				keyGroupRange,
 				registrationName,
-				new DummyKvState()
+				new DummyKvState(),
+				getDummyStateDescriptor()
 		);
 
 		final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
@@ -173,7 +176,8 @@ public void testKvStateRegistryListenerNotification() {
 			jobVertexId,
 			keyGroupRange,
 			registrationName,
-			new DummyKvState());
+			new DummyKvState(),
+			getDummyStateDescriptor());
 
 		assertThat(registeredNotifications1.poll(), equalTo(jobId1));
 		assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -186,7 +190,8 @@ public void testKvStateRegistryListenerNotification() {
 			jobVertexId2,
 			keyGroupRange2,
 			registrationName2,
-			new DummyKvState());
+			new DummyKvState(),
+			getDummyStateDescriptor());
 
 		assertThat(registeredNotifications2.poll(), equalTo(jobId2));
 		assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -244,7 +249,8 @@ public void testLegacyCodePathPreference() {
 			jobVertexId,
 			keyGroupRange,
 			registrationName,
-			new DummyKvState());
+			new DummyKvState(),
+			getDummyStateDescriptor());
 
 		assertThat(stateRegistrationNotifications.poll(), equalTo(jobId));
 		// another listener should not have received any notifications
@@ -410,4 +416,9 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 			return null;
 		}
 	}
+
+	private ValueStateDescriptor<String> getDummyStateDescriptor() {
+		ValueStateDescriptor<String> dummyDescriptor = new ValueStateDescriptor<String>("desriptor", StringSerializer.INSTANCE);
+		return dummyDescriptor;
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services