You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/23 09:22:47 UTC

[flink] 02/05: [FLINK-11328] [cep] Snapshots of NFA-related serializers should be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ade21d13478d8d9cf00bd80f2e80729636505522
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 21 18:02:05 2019 +0100

    [FLINK-11328] [cep] Snapshots of NFA-related serializers should be a CompositeTypeSerializerSnapshot
---
 .../java/org/apache/flink/cep/nfa/DeweyNumber.java | 14 +---
 .../apache/flink/cep/nfa/NFAStateSerializer.java   |  2 +-
 .../apache/flink/cep/nfa/sharedbuffer/EventId.java | 16 ++---
 .../apache/flink/cep/nfa/sharedbuffer/NodeId.java  | 75 ++++++++++++++++----
 .../flink/cep/nfa/sharedbuffer/SharedBuffer.java   |  2 +-
 .../cep/nfa/sharedbuffer/SharedBufferEdge.java     | 80 ++++++++++++++++++----
 .../cep/nfa/sharedbuffer/SharedBufferNode.java     | 45 ++++++++++--
 7 files changed, 179 insertions(+), 55 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 53fffce..03881f3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -21,7 +21,6 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -196,8 +195,6 @@ public class DeweyNumber implements Serializable {
 
 		private static final long serialVersionUID = -5086792497034943656L;
 
-		private final IntSerializer elemSerializer = IntSerializer.INSTANCE;
-
 		public static final DeweyNumberSerializer INSTANCE = new DeweyNumberSerializer();
 
 		private DeweyNumberSerializer() {}
@@ -232,7 +229,7 @@ public class DeweyNumber implements Serializable {
 			final int size = record.length();
 			target.writeInt(size);
 			for (int i = 0; i < size; i++) {
-				elemSerializer.serialize(record.deweyNumber[i], target);
+				target.writeInt(record.deweyNumber[i]);
 			}
 		}
 
@@ -241,7 +238,7 @@ public class DeweyNumber implements Serializable {
 			final int size = source.readInt();
 			int[] number = new int[size];
 			for (int i = 0; i < size; i++) {
-				number[i] = elemSerializer.deserialize(source);
+				number[i] = source.readInt();
 			}
 			return new DeweyNumber(number);
 		}
@@ -256,7 +253,7 @@ public class DeweyNumber implements Serializable {
 			final int size = source.readInt();
 			target.writeInt(size);
 			for (int i = 0; i < size; i++) {
-				elemSerializer.copy(source, target);
+				target.writeInt(source.readInt());
 			}
 		}
 
@@ -270,11 +267,6 @@ public class DeweyNumber implements Serializable {
 			return true;
 		}
 
-		@Override
-		public int hashCode() {
-			return elemSerializer.hashCode();
-		}
-
 		// -----------------------------------------------------------------------------------
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index ccbe25c..7b83030 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -95,7 +95,7 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 	private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
 	private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
 	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = NodeId.NodeIdSerializer.INSTANCE;
+	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
 	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
 
 	@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 045cf38..3ac39b5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -21,8 +21,6 @@ package org.apache.flink.cep.nfa.sharedbuffer;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -117,19 +115,19 @@ public class EventId implements Comparable<EventId> {
 
 		@Override
 		public int getLength() {
-			return 2 * LongSerializer.INSTANCE.getLength();
+			return Integer.BYTES + Long.BYTES;
 		}
 
 		@Override
 		public void serialize(EventId record, DataOutputView target) throws IOException {
-			IntSerializer.INSTANCE.serialize(record.id, target);
-			LongSerializer.INSTANCE.serialize(record.timestamp, target);
+			target.writeInt(record.id);
+			target.writeLong(record.timestamp);
 		}
 
 		@Override
 		public EventId deserialize(DataInputView source) throws IOException {
-			int id = IntSerializer.INSTANCE.deserialize(source);
-			long timestamp = LongSerializer.INSTANCE.deserialize(source);
+			int id = source.readInt();
+			long timestamp = source.readLong();
 
 			return new EventId(id, timestamp);
 		}
@@ -141,8 +139,8 @@ public class EventId implements Comparable<EventId> {
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			IntSerializer.INSTANCE.copy(source, target);
-			LongSerializer.INSTANCE.copy(source, target);
+			target.writeInt(source.readInt());
+			target.writeLong(source.readLong());
 		}
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index 87dc2c3..2693d4b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.StringValue;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Unique identifier for {@link SharedBufferNode}.
  */
@@ -81,9 +84,19 @@ public class NodeId {
 
 		private static final long serialVersionUID = 9209498028181378582L;
 
-		public static final NodeIdSerializer INSTANCE = new NodeIdSerializer();
+		/**
+		 * NOTE: this field should actually be final.
+		 * The reason that it isn't final is due to backward compatible deserialization
+		 * paths. See {@link #readObject(ObjectInputStream)}.
+		 */
+		private TypeSerializer<EventId> eventIdSerializer;
+
+		public NodeIdSerializer() {
+			this(EventId.EventIdSerializer.INSTANCE);
+		}
 
-		private NodeIdSerializer() {
+		private NodeIdSerializer(TypeSerializer<EventId> eventIdSerializer) {
+			this.eventIdSerializer = checkNotNull(eventIdSerializer);
 		}
 
 		@Override
@@ -115,8 +128,8 @@ public class NodeId {
 		public void serialize(NodeId record, DataOutputView target) throws IOException {
 			if (record != null) {
 				target.writeByte(1);
-				EventId.EventIdSerializer.INSTANCE.serialize(record.eventId, target);
-				StringSerializer.INSTANCE.serialize(record.pageName, target);
+				eventIdSerializer.serialize(record.eventId, target);
+				StringValue.writeString(record.pageName, target);
 			} else {
 				target.writeByte(0);
 			}
@@ -129,8 +142,8 @@ public class NodeId {
 				return null;
 			}
 
-			EventId eventId = EventId.EventIdSerializer.INSTANCE.deserialize(source);
-			String pageName = StringSerializer.INSTANCE.deserialize(source);
+			EventId eventId = eventIdSerializer.deserialize(source);
+			String pageName = StringValue.readString(source);
 			return new NodeId(eventId, pageName);
 		}
 
@@ -143,9 +156,8 @@ public class NodeId {
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
 			target.writeByte(source.readByte());
 
-			LongSerializer.INSTANCE.copy(source, target); // eventId
-			LongSerializer.INSTANCE.copy(source, target); // timestamp
-			StringSerializer.INSTANCE.copy(source, target); // pageName
+			eventIdSerializer.copy(source, target);
+			StringValue.copyString(source, target);
 		}
 
 		@Override
@@ -157,17 +169,50 @@ public class NodeId {
 
 		@Override
 		public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
-			return new NodeIdSerializerSnapshot();
+			return new NodeIdSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
+		public static final class NodeIdSerializerSnapshot extends CompositeTypeSerializerSnapshot<NodeId, NodeIdSerializer> {
+
+			private static final int VERSION = 1;
 
 			public NodeIdSerializerSnapshot() {
-				super(() -> INSTANCE);
+				super(NodeIdSerializer.class);
+			}
+
+			public NodeIdSerializerSnapshot(NodeIdSerializer nodeIdSerializer) {
+				super(nodeIdSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			protected NodeIdSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new NodeIdSerializer((EventId.EventIdSerializer) nestedSerializers[0]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(NodeIdSerializer outerSerializer) {
+				return new TypeSerializer<?>[]{ outerSerializer.eventIdSerializer };
+			}
+		}
+
+		// ------------------------------------------------------------------------
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			in.defaultReadObject();
+
+			if (eventIdSerializer == null) {
+				// the nested serializer will be null if this was read from a savepoint taken with versions
+				// lower than Flink 1.7; in this case, we explicitly create instance for the nested serializer.
+				this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
 			}
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 5a37897..19f9d2a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -78,7 +78,7 @@ public class SharedBuffer<V> {
 		this.entries = stateStore.getMapState(
 			new MapStateDescriptor<>(
 				entriesStateName,
-				NodeId.NodeIdSerializer.INSTANCE,
+				new NodeId.NodeIdSerializer(),
 				new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));
 
 		this.eventsCount = stateStore.getMapState(
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index 2af92f5..d6a95f6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.DeweyNumber;
@@ -26,6 +27,9 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Versioned edge in {@link SharedBuffer} that allows retrieving predecessors.
@@ -67,9 +71,24 @@ public class SharedBufferEdge {
 
 		private static final long serialVersionUID = -5122474955050663979L;
 
-		static final SharedBufferEdgeSerializer INSTANCE = new SharedBufferEdgeSerializer();
+		/**
+		 * NOTE: these serializer fields should actually be final.
+		 * The reason that it isn't final is due to backward compatible deserialization
+		 * paths. See {@link #readObject(ObjectInputStream)}.
+		 */
+		private TypeSerializer<NodeId> nodeIdSerializer;
+		private TypeSerializer<DeweyNumber> deweyNumberSerializer;
+
+		public SharedBufferEdgeSerializer() {
+			this(new NodeId.NodeIdSerializer(), DeweyNumber.DeweyNumberSerializer.INSTANCE);
+		}
 
-		private SharedBufferEdgeSerializer() {}
+		private SharedBufferEdgeSerializer(
+				TypeSerializer<NodeId> nodeIdSerializer,
+				TypeSerializer<DeweyNumber> deweyNumberSerializer) {
+			this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
+			this.deweyNumberSerializer = checkNotNull(deweyNumberSerializer);
+		}
 
 		@Override
 		public boolean isImmutableType() {
@@ -98,14 +117,14 @@ public class SharedBufferEdge {
 
 		@Override
 		public void serialize(SharedBufferEdge record, DataOutputView target) throws IOException {
-			NodeId.NodeIdSerializer.INSTANCE.serialize(record.target, target);
-			DeweyNumber.DeweyNumberSerializer.INSTANCE.serialize(record.deweyNumber, target);
+			nodeIdSerializer.serialize(record.target, target);
+			deweyNumberSerializer.serialize(record.deweyNumber, target);
 		}
 
 		@Override
 		public SharedBufferEdge deserialize(DataInputView source) throws IOException {
-			NodeId target = NodeId.NodeIdSerializer.INSTANCE.deserialize(source);
-			DeweyNumber deweyNumber = DeweyNumber.DeweyNumberSerializer.INSTANCE.deserialize(source);
+			NodeId target = nodeIdSerializer.deserialize(source);
+			DeweyNumber deweyNumber = deweyNumberSerializer.deserialize(source);
 			return new SharedBufferEdge(target, deweyNumber);
 		}
 
@@ -116,8 +135,8 @@ public class SharedBufferEdge {
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			NodeId.NodeIdSerializer.INSTANCE.copy(source, target);
-			DeweyNumber.DeweyNumberSerializer.INSTANCE.copy(source, target);
+			nodeIdSerializer.copy(source, target);
+			deweyNumberSerializer.copy(source, target);
 		}
 
 		@Override
@@ -129,17 +148,54 @@ public class SharedBufferEdge {
 
 		@Override
 		public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
-			return new SharedBufferEdgeSerializerSnapshot();
+			return new SharedBufferEdgeSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
+		public static final class SharedBufferEdgeSerializerSnapshot
+				extends CompositeTypeSerializerSnapshot<SharedBufferEdge, SharedBufferEdgeSerializer> {
+
+			private static final int VERSION = 1;
 
 			public SharedBufferEdgeSerializerSnapshot() {
-				super(() -> INSTANCE);
+				super(SharedBufferEdgeSerializer.class);
+			}
+
+			public SharedBufferEdgeSerializerSnapshot(SharedBufferEdgeSerializer sharedBufferEdgeSerializer) {
+				super(sharedBufferEdgeSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			protected SharedBufferEdgeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new SharedBufferEdgeSerializer(
+					(NodeId.NodeIdSerializer) nestedSerializers[0],
+					(DeweyNumber.DeweyNumberSerializer) nestedSerializers[1]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(SharedBufferEdgeSerializer outerSerializer) {
+				return new TypeSerializer<?>[] { outerSerializer.nodeIdSerializer, outerSerializer.deweyNumberSerializer };
+			}
+		}
+
+		// ------------------------------------------------------------------------
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			in.defaultReadObject();
+
+			if (nodeIdSerializer == null) {
+				// the nested serializers will be null if this was read from a savepoint taken with versions
+				// lower than Flink 1.7; in this case, we explicitly create instances for the nested serializers
+				this.nodeIdSerializer = new NodeId.NodeIdSerializer();
+				this.deweyNumberSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
 			}
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index 96afdbb..473a342 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -30,6 +31,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An entry in {@link SharedBuffer} that allows to store relations between different entries.
  */
@@ -65,8 +68,15 @@ public class SharedBufferNode {
 
 		private static final long serialVersionUID = -6687780732295439832L;
 
-		private final ListSerializer<SharedBufferEdge> edgesSerializer =
-			new ListSerializer<>(SharedBufferEdgeSerializer.INSTANCE);
+		private final ListSerializer<SharedBufferEdge> edgesSerializer;
+
+		public SharedBufferNodeSerializer() {
+			this.edgesSerializer = new ListSerializer<>(new SharedBufferEdgeSerializer());
+		}
+
+		private SharedBufferNodeSerializer(ListSerializer<SharedBufferEdge> edgesSerializer) {
+			this.edgesSerializer = checkNotNull(edgesSerializer);
+		}
 
 		@Override
 		public boolean isImmutableType() {
@@ -123,17 +133,40 @@ public class SharedBufferNode {
 
 		@Override
 		public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
-			return new SharedBufferNodeSerializerSnapshot();
+			return new SharedBufferNodeSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class SharedBufferNodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferNode> {
+		public static final class SharedBufferNodeSerializerSnapshot
+				extends CompositeTypeSerializerSnapshot<SharedBufferNode, SharedBufferNodeSerializer> {
+
+			private static final int VERSION = 1;
 
 			public SharedBufferNodeSerializerSnapshot() {
-				super(SharedBufferNodeSerializer::new);
+				super(SharedBufferNodeSerializer.class);
+			}
+
+			public SharedBufferNodeSerializerSnapshot(SharedBufferNodeSerializer sharedBufferNodeSerializer) {
+				super(sharedBufferNodeSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			@SuppressWarnings("unchecked")
+			protected SharedBufferNodeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new SharedBufferNodeSerializer((ListSerializer<SharedBufferEdge>) nestedSerializers[0]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(SharedBufferNodeSerializer outerSerializer) {
+				return new TypeSerializer<?>[]{ outerSerializer.edgesSerializer };
 			}
 		}
 	}