You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/07 09:38:58 UTC

[GitHub] dawidwys closed pull request #4172: [FLINK-6983] [cep] Do not serialize States with NFA

dawidwys closed pull request #4172: [FLINK-6983] [cep] Do not serialize States with NFA
URL: https://github.com/apache/flink/pull/4172
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index a6c5bdeba2f..a1b251cd65e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -116,27 +116,27 @@
 	@Deprecated
 	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
 
-	//////////////////			End of Backwards Compatibility Fields			//////////////////
-
 	/**
-	 * A set of all the valid NFA states, as returned by the
-	 * {@link NFACompiler NFACompiler}.
-	 * These are directly derived from the user-specified pattern.
+	 * @deprecated Used only for backwards compatibility.
 	 */
+	@Deprecated
 	private Set<State<T>> states;
 
 	/**
-	 * The length of a windowed pattern, as specified using the
-	 * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  Pattern.within(Time)}
-	 * method.
+	 * @deprecated Used only for backwards compatibility.
 	 */
-	private final long windowTime;
+	@Deprecated
+	private long windowTime;
 
 	/**
-	 * A flag indicating if we want timed-out patterns (in case of windowed patterns)
-	 * to be emitted ({@code true}), or silently discarded ({@code false}).
+	 * @deprecated Used only for backwards compatibility.
 	 */
-	private final boolean handleTimeout;
+	@Deprecated
+	private boolean handleTimeout;
+
+	//////////////////			End of Backwards Compatibility Fields			//////////////////
+
+	private transient MetaStates<T> metaStates;
 
 	/**
 	 * Current set of {@link ComputationState computation states} within the state machine.
@@ -159,15 +159,17 @@ public NFA(
 
 		this.eventSerializer = eventSerializer;
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
-		this.windowTime = windowTime;
-		this.handleTimeout = handleTimeout;
+		this.metaStates = new MetaStates<>(windowTime, handleTimeout);
 		this.eventSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		this.computationStates = new LinkedList<>();
-		this.states = new HashSet<>();
+	}
+
+	public MetaStates<T> getMetaStates() {
+		return metaStates;
 	}
 
 	public Set<State<T>> getStates() {
-		return states;
+		return metaStates.states;
 	}
 
 	public void addStates(final Collection<State<T>> newStates) {
@@ -177,7 +179,7 @@ public void addStates(final Collection<State<T>> newStates) {
 	}
 
 	public void addState(final State<T> state) {
-		states.add(state);
+		metaStates.states.add(state);
 
 		if (state.isStart()) {
 			computationStates.add(ComputationState.createStartState(this, state));
@@ -221,10 +223,10 @@ public boolean isEmpty() {
 			final Collection<ComputationState<T>> newComputationStates;
 
 			if (!computationState.isStartState() &&
-				windowTime > 0L &&
-				timestamp - computationState.getStartTimestamp() >= windowTime) {
+				metaStates.windowTime > 0L &&
+				timestamp - computationState.getStartTimestamp() >= metaStates.windowTime) {
 
-				if (handleTimeout) {
+				if (metaStates.handleTimeout) {
 					// extract the timed out event pattern
 					Map<String, List<T>> timedoutPattern = extractCurrentMatches(computationState);
 					timeoutResult.add(Tuple2.of(timedoutPattern, timestamp));
@@ -294,8 +296,8 @@ public boolean isEmpty() {
 		}
 
 		// prune shared buffer based on window length
-		if (windowTime > 0L) {
-			long pruningTimestamp = timestamp - windowTime;
+		if (metaStates.windowTime > 0L) {
+			long pruningTimestamp = timestamp - metaStates.windowTime;
 
 			if (pruningTimestamp < timestamp) {
 				// the check is to guard against underflows
@@ -317,8 +319,8 @@ public boolean equals(Object obj) {
 
 			return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
 				eventSharedBuffer.equals(other.eventSharedBuffer) &&
-				states.equals(other.states) &&
-				windowTime == other.windowTime;
+				metaStates.states.equals(other.metaStates.states) &&
+				metaStates.windowTime == other.metaStates.windowTime;
 		} else {
 			return false;
 		}
@@ -326,7 +328,11 @@ public boolean equals(Object obj) {
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(nonDuplicatingTypeSerializer, eventSharedBuffer, states, windowTime);
+		return Objects.hash(
+			nonDuplicatingTypeSerializer,
+			eventSharedBuffer,
+			metaStates.states,
+			metaStates.windowTime);
 	}
 
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -788,8 +794,8 @@ public boolean apply(@Nullable State<T> input) {
 			convertedStates.get(startName),
 			new DeweyNumber(this.startEventCounter)));
 
-		this.states.clear();
-		this.states.addAll(convertedStates.values());
+		this.metaStates = new MetaStates<T>(this.windowTime, this.handleTimeout);
+		this.metaStates.states.addAll(convertedStates.values());
 
 		return computationStates;
 	}
@@ -847,7 +853,493 @@ public int getVersion() {
 	}
 
 	/**
-	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
+	 * A {@link TypeSerializer} for {@link NFA} that uses Flink's custom Serialization.
+	 */
+	public static class NFASerializerV2<T> extends TypeSerializer<NFA<T>> {
+
+		private static final long serialVersionUID = 2098282423980597010L;
+
+		private final TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer;
+
+		private final TypeSerializer<T> eventSerializer;
+
+		private final MetaStates<T> metaStates;
+
+		public NFASerializerV2(TypeSerializer<T> typeSerializer, MetaStates<T> metaStates) {
+			this(
+				typeSerializer,
+				new SharedBuffer.SharedBufferSerializer<>(StringSerializer.INSTANCE, typeSerializer),
+				metaStates);
+		}
+
+		public NFASerializerV2(
+				TypeSerializer<T> typeSerializer,
+				TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer,
+				MetaStates<T> metaStates) {
+			this.eventSerializer = typeSerializer;
+			this.sharedBufferSerializer = sharedBufferSerializer;
+			this.metaStates = metaStates;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<NFA<T>> duplicate() {
+			return this;
+		}
+
+		@Override
+		public NFA<T> createInstance() {
+			return null;
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from) {
+			try {
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+				serialize(from, new DataOutputViewStreamWrapper(oos));
+
+				oos.close();
+				baos.close();
+
+				byte[] data = baos.toByteArray();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				ObjectInputStream ois = new ObjectInputStream(bais);
+
+				@SuppressWarnings("unchecked")
+				NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois));
+				ois.close();
+				bais.close();
+				return copy;
+			} catch (IOException e) {
+				throw new RuntimeException("Could not copy NFA.", e);
+			}
+		}
+
+		@Override
+		public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
+			sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
+
+			target.writeInt(record.computationStates.size());
+
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			for (ComputationState<T> computationState: record.computationStates) {
+				stateNameSerializer.serialize(computationState.getState().getName(), target);
+				stateNameSerializer.serialize(computationState.getPreviousState() == null
+						? null : computationState.getPreviousState().getName(), target);
+
+				timestampSerializer.serialize(computationState.getTimestamp(), target);
+				versionSerializer.serialize(computationState.getVersion(), target);
+				timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+				target.writeInt(computationState.getCounter());
+
+				if (computationState.getEvent() == null) {
+					target.writeBoolean(false);
+				} else {
+					target.writeBoolean(true);
+					eventSerializer.serialize(computationState.getEvent(), target);
+				}
+			}
+		}
+
+		@Override
+		public NFA<T> deserialize(DataInputView source) throws IOException {
+			NFA<T> nfa = new NFA<>(eventSerializer, metaStates.windowTime, metaStates.handleTimeout);
+			nfa.metaStates.states = metaStates.states;
+
+			nfa.eventSharedBuffer = sharedBufferSerializer.deserialize(source);
+
+			Queue<ComputationState<T>> computationStates = new LinkedList<>();
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			int computationStateNo = source.readInt();
+			for (int i = 0; i < computationStateNo; i++) {
+				State<T> state = getStateByName(stateNameSerializer.deserialize(source), nfa);
+				State<T> prevState = getStateByName(stateNameSerializer.deserialize(source), nfa);
+				long timestamp = timestampSerializer.deserialize(source);
+				DeweyNumber version = versionSerializer.deserialize(source);
+				long startTimestamp = timestampSerializer.deserialize(source);
+				int counter = source.readInt();
+
+				T event = null;
+				if (source.readBoolean()) {
+					event = eventSerializer.deserialize(source);
+				}
+
+				computationStates.add(ComputationState.createState(
+						nfa, state, prevState, event, counter, timestamp, version, startTimestamp));
+			}
+
+			nfa.computationStates = computationStates;
+			return nfa;
+		}
+
+		private State<T> getStateByName(String name, NFA<T> nfa) {
+			for (State<T> state: nfa.metaStates.states) {
+				if (state.getName().equals(name)) {
+					return state;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			SharedBuffer<String, T> sharedBuffer = sharedBufferSerializer.deserialize(source);
+			sharedBufferSerializer.serialize(sharedBuffer, target);
+
+			StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
+			LongSerializer timestampSerializer = LongSerializer.INSTANCE;
+			DeweyNumber.DeweyNumberSerializer versionSerializer = new DeweyNumber.DeweyNumberSerializer();
+
+			int computationStateNo = source.readInt();
+			target.writeInt(computationStateNo);
+
+			for (int i = 0; i < computationStateNo; i++) {
+				String stateName = stateNameSerializer.deserialize(source);
+				stateNameSerializer.serialize(stateName, target);
+
+				String prevStateName = stateNameSerializer.deserialize(source);
+				stateNameSerializer.serialize(prevStateName, target);
+
+				long timestamp = timestampSerializer.deserialize(source);
+				timestampSerializer.serialize(timestamp, target);
+
+				DeweyNumber version = versionSerializer.deserialize(source);
+				versionSerializer.serialize(version, target);
+
+				long startTimestamp = timestampSerializer.deserialize(source);
+				timestampSerializer.serialize(startTimestamp, target);
+
+				int counter = source.readInt();
+				target.writeInt(counter);
+
+				boolean hasEvent = source.readBoolean();
+				target.writeBoolean(hasEvent);
+				if (hasEvent) {
+					T event = eventSerializer.deserialize(source);
+					eventSerializer.serialize(event, target);
+				}
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this ||
+					(obj != null && obj.getClass().equals(getClass()) &&
+							sharedBufferSerializer.equals(((NFASerializerV2) obj).sharedBufferSerializer) &&
+							eventSerializer.equals(((NFASerializerV2) obj).eventSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return 37 * sharedBufferSerializer.hashCode() + eventSerializer.hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return new NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
+		}
+
+		@Override
+		public CompatibilityResult<NFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			if (configSnapshot instanceof NFASerializerConfigSnapshot) {
+				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
+						((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+				CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult(
+						serializersAndConfigs.get(0).f0,
+						UnloadableDummyTypeSerializer.class,
+						serializersAndConfigs.get(0).f1,
+						eventSerializer);
+
+				CompatibilityResult<SharedBuffer<String, T>> sharedBufCompatResult =
+						CompatibilityUtil.resolveCompatibilityResult(
+								serializersAndConfigs.get(1).f0,
+								UnloadableDummyTypeSerializer.class,
+								serializersAndConfigs.get(1).f1,
+								sharedBufferSerializer);
+
+				if (!sharedBufCompatResult.isRequiresMigration() && !eventCompatResult.isRequiresMigration()) {
+					return CompatibilityResult.compatible();
+				} else {
+					if (eventCompatResult.getConvertDeserializer() != null &&
+							sharedBufCompatResult.getConvertDeserializer() != null) {
+						return CompatibilityResult.requiresMigration(
+								new NFASerializerV2<>(
+										new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
+										new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer()),
+										metaStates));
+					}
+				}
+			}
+
+			return CompatibilityResult.requiresMigration();
+		}
+	}
+
+	public static class MetaStates<T> implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		MetaStates(final long windowTime, final boolean handleTimeout) {
+			this(windowTime, handleTimeout, new HashSet<State<T>>());
+		}
+
+		MetaStates(final long windowTime, final boolean handleTimeout, final Set<State<T>> states) {
+			this.windowTime = windowTime;
+			this.handleTimeout = handleTimeout;
+			this.states = states;
+		}
+
+		/**
+		 * A set of all the valid NFA states, as returned by the
+		 * {@link NFACompiler NFACompiler}.
+		 * These are directly derived from the user-specified pattern.
+		 */
+		private Set<State<T>> states;
+
+		/**
+		 * The length of a windowed pattern, as specified using the
+		 * {@link org.apache.flink.cep.pattern.Pattern#within(Time)}  Pattern.within(Time)}
+		 * method.
+		 */
+		private final long windowTime;
+
+		/**
+		 * A flag indicating if we want timed-out patterns (in case of windowed patterns)
+		 * to be emitted ({@code true}), or silently discarded ({@code false}).
+		 */
+		private final boolean handleTimeout;
+
+		public static class MetaStatesSerializer<T> extends TypeSerializerSingleton<MetaStates<T>> {
+
+			@Override
+			public boolean isImmutableType() {
+				return false;
+			}
+
+			@Override
+			public MetaStates<T> createInstance() {
+				return null;
+			}
+
+			@Override
+			public MetaStates<T> copy(MetaStates<T> from) {
+				try {
+					ByteArrayOutputStream baos = new ByteArrayOutputStream();
+					ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+					serialize(from, new DataOutputViewStreamWrapper(oos));
+
+					oos.close();
+					baos.close();
+
+					byte[] data = baos.toByteArray();
+
+					ByteArrayInputStream bais = new ByteArrayInputStream(data);
+					ObjectInputStream ois = new ObjectInputStream(bais);
+
+					@SuppressWarnings("unchecked")
+					MetaStates<T> copy = deserialize(new DataInputViewStreamWrapper(ois));
+					ois.close();
+					bais.close();
+					return copy;
+				} catch (IOException e) {
+					throw new RuntimeException("Could not copy NFA.", e);
+				}
+			}
+
+			@Override
+			public MetaStates<T> copy(MetaStates<T> from, MetaStates<T> reuse) {
+				return copy(from);
+			}
+
+			@Override
+			public int getLength() {
+				return -1;
+			}
+
+			@Override
+			public void serialize(MetaStates<T> record, DataOutputView target) throws IOException {
+				serializeStates(record.states, target);
+				target.writeLong(record.windowTime);
+				target.writeBoolean(record.handleTimeout);
+			}
+
+			@Override
+			public MetaStates<T> deserialize(DataInputView source) throws IOException {
+				Set<State<T>> states = deserializeStates(source);
+				long windowTime = source.readLong();
+				boolean handleTimeout = source.readBoolean();
+				return new MetaStates<>(windowTime, handleTimeout, states);
+			}
+
+			@Override
+			public MetaStates<T> deserialize(MetaStates<T> reuse, DataInputView source) throws IOException {
+				return deserialize(source);
+			}
+
+			@Override
+			public void copy(DataInputView source, DataOutputView target) throws IOException {
+				Set<State<T>> states = deserializeStates(source);
+				serializeStates(states, target);
+
+				long windowTime = source.readLong();
+				target.writeLong(windowTime);
+
+				boolean handleTimeout = source.readBoolean();
+				target.writeBoolean(handleTimeout);
+			}
+
+			@Override
+			public boolean canEqual(Object obj) {
+				return true;
+			}
+
+			private void serializeStates(Set<State<T>> states, DataOutputView out) throws IOException {
+				TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
+				TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+				TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+				out.writeInt(states.size());
+				for (State<T> state: states) {
+					nameSerializer.serialize(state.getName(), out);
+					stateTypeSerializer.serialize(state.getStateType(), out);
+				}
+
+				for (State<T> state: states) {
+					nameSerializer.serialize(state.getName(), out);
+
+					out.writeInt(state.getStateTransitions().size());
+					for (StateTransition<T> transition : state.getStateTransitions()) {
+						nameSerializer.serialize(transition.getSourceState().getName(), out);
+						nameSerializer.serialize(transition.getTargetState().getName(), out);
+						actionSerializer.serialize(transition.getAction(), out);
+
+						serializeCondition(transition.getCondition(), out);
+					}
+				}
+			}
+
+			private Set<State<T>> deserializeStates(DataInputView in) throws IOException {
+				TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
+				TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
+				TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
+
+				final int noOfStates = in.readInt();
+				Map<String, State<T>> states = new HashMap<>(noOfStates);
+
+				for (int i = 0; i < noOfStates; i++) {
+					String stateName = nameSerializer.deserialize(in);
+					State.StateType stateType = stateTypeSerializer.deserialize(in);
+
+					State<T> state = new State<>(stateName, stateType);
+					states.put(stateName, state);
+				}
+
+				for (int i = 0; i < noOfStates; i++) {
+					String srcName = nameSerializer.deserialize(in);
+
+					int noOfTransitions = in.readInt();
+					for (int j = 0; j < noOfTransitions; j++) {
+						String src = nameSerializer.deserialize(in);
+						Preconditions.checkState(src.equals(srcName),
+											"Source Edge names do not match (" + srcName + " - " + src + ").");
+
+						String trgt = nameSerializer.deserialize(in);
+						StateTransitionAction action = actionSerializer.deserialize(in);
+
+						IterativeCondition<T> condition = null;
+						try {
+							condition = deserializeCondition(in);
+						} catch (ClassNotFoundException e) {
+							e.printStackTrace();
+						}
+
+						State<T> srcState = states.get(src);
+						State<T> trgtState = states.get(trgt);
+						srcState.addStateTransition(action, trgtState, condition);
+					}
+
+				}
+				return new HashSet<>(states.values());
+			}
+
+			private void serializeCondition(IterativeCondition<T> condition, DataOutputView out) throws IOException {
+				out.writeBoolean(condition != null);
+				if (condition != null) {
+					ByteArrayOutputStream baos = new ByteArrayOutputStream();
+					ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+					oos.writeObject(condition);
+
+					oos.close();
+					baos.close();
+
+					byte[] serCondition = baos.toByteArray();
+					out.writeInt(serCondition.length);
+					out.write(serCondition);
+				}
+			}
+
+			private IterativeCondition<T> deserializeCondition(DataInputView in) throws IOException, ClassNotFoundException {
+				boolean hasCondition = in.readBoolean();
+				if (hasCondition) {
+					int length = in.readInt();
+
+					byte[] serCondition = new byte[length];
+					in.readFully(serCondition);
+
+					ByteArrayInputStream bais = new ByteArrayInputStream(serCondition);
+					ObjectInputStream ois = new ObjectInputStream(bais);
+
+					IterativeCondition<T> condition = (IterativeCondition<T>) ois.readObject();
+					ois.close();
+					bais.close();
+
+					return condition;
+				}
+				return null;
+			}
+		}
+	}
+
+	//////////////////			Old Serialization			//////////////////////
+
+	/**
+	 * A {@link TypeSerializer} for {@link NFA} that uses Flink's custom Serialization.
 	 */
 	public static class NFASerializer<T> extends TypeSerializer<NFA<T>> {
 
@@ -862,8 +1354,8 @@ public NFASerializer(TypeSerializer<T> typeSerializer) {
 		}
 
 		public NFASerializer(
-				TypeSerializer<T> typeSerializer,
-				TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer) {
+			TypeSerializer<T> typeSerializer,
+			TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer) {
 			this.eventSerializer = typeSerializer;
 			this.sharedBufferSerializer = sharedBufferSerializer;
 		}
@@ -936,7 +1428,7 @@ public void serialize(NFA<T> record, DataOutputView target) throws IOException {
 			for (ComputationState<T> computationState: record.computationStates) {
 				stateNameSerializer.serialize(computationState.getState().getName(), target);
 				stateNameSerializer.serialize(computationState.getPreviousState() == null
-						? null : computationState.getPreviousState().getName(), target);
+					                              ? null : computationState.getPreviousState().getName(), target);
 
 				timestampSerializer.serialize(computationState.getTimestamp(), target);
 				versionSerializer.serialize(computationState.getVersion(), target);
@@ -983,7 +1475,7 @@ public void serialize(NFA<T> record, DataOutputView target) throws IOException {
 				}
 
 				computationStates.add(ComputationState.createState(
-						nfa, state, prevState, event, counter, timestamp, version, startTimestamp));
+					nfa, state, prevState, event, counter, timestamp, version, startTimestamp));
 			}
 
 			nfa.computationStates = computationStates;
@@ -1056,9 +1548,9 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 		@Override
 		public boolean equals(Object obj) {
 			return obj == this ||
-					(obj != null && obj.getClass().equals(getClass()) &&
-							sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
-							eventSerializer.equals(((NFASerializer) obj).eventSerializer));
+			       (obj != null && obj.getClass().equals(getClass()) &&
+			        sharedBufferSerializer.equals(((NFASerializer) obj).sharedBufferSerializer) &&
+			        eventSerializer.equals(((NFASerializer) obj).eventSerializer));
 		}
 
 		@Override
@@ -1080,30 +1572,30 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 		public CompatibilityResult<NFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 			if (configSnapshot instanceof NFASerializerConfigSnapshot) {
 				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs =
-						((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+					((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						serializersAndConfigs.get(0).f0,
-						UnloadableDummyTypeSerializer.class,
-						serializersAndConfigs.get(0).f1,
-						eventSerializer);
+					serializersAndConfigs.get(0).f0,
+					UnloadableDummyTypeSerializer.class,
+					serializersAndConfigs.get(0).f1,
+					eventSerializer);
 
 				CompatibilityResult<SharedBuffer<String, T>> sharedBufCompatResult =
-						CompatibilityUtil.resolveCompatibilityResult(
-								serializersAndConfigs.get(1).f0,
-								UnloadableDummyTypeSerializer.class,
-								serializersAndConfigs.get(1).f1,
-								sharedBufferSerializer);
+					CompatibilityUtil.resolveCompatibilityResult(
+						serializersAndConfigs.get(1).f0,
+						UnloadableDummyTypeSerializer.class,
+						serializersAndConfigs.get(1).f1,
+						sharedBufferSerializer);
 
 				if (!sharedBufCompatResult.isRequiresMigration() && !eventCompatResult.isRequiresMigration()) {
 					return CompatibilityResult.compatible();
 				} else {
 					if (eventCompatResult.getConvertDeserializer() != null &&
-							sharedBufCompatResult.getConvertDeserializer() != null) {
+					    sharedBufCompatResult.getConvertDeserializer() != null) {
 						return CompatibilityResult.requiresMigration(
-								new NFASerializer<>(
-										new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
-										new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
+							new NFASerializer<>(
+								new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
+								new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
 					}
 				}
 			}
@@ -1159,7 +1651,7 @@ private void serializeStates(Set<State<T>> states, DataOutputView out) throws IO
 				for (int j = 0; j < noOfTransitions; j++) {
 					String src = nameSerializer.deserialize(in);
 					Preconditions.checkState(src.equals(srcName),
-							"Source Edge names do not match (" + srcName + " - " + src + ").");
+					                         "Source Edge names do not match (" + srcName + " - " + src + ").");
 
 					String trgt = nameSerializer.deserialize(in);
 					StateTransitionAction action = actionSerializer.deserialize(in);
@@ -1218,8 +1710,6 @@ private void serializeCondition(IterativeCondition<T> condition, DataOutputView
 		}
 	}
 
-	//////////////////			Old Serialization			//////////////////////
-
 	/**
 	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
 	 */
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 2e3aefd0dc7..86a2be552e4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -94,9 +96,11 @@
 	///////////////			State			//////////////
 
 	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
+	private static final String NFA_METASTATES_OPERATOR_STATE_NAME = "nfaMetaStatesOperatorStateName";
 	private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
 
 	private transient ValueState<NFA<IN>> nfaOperatorState;
+	private transient ListState<NFA.MetaStates<IN>> nfaMetaStatesOperatorState;
 	private transient MapState<Long, List<IN>> elementQueueState;
 
 	private final NFACompiler.NFAFactory<IN> nfaFactory;
@@ -134,11 +138,24 @@ public AbstractKeyedCEPPatternOperator(
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
+		if (nfaMetaStatesOperatorState == null) {
+			nfaMetaStatesOperatorState = getOperatorStateBackend().getUnionListState(
+				new ListStateDescriptor<> (
+					NFA_METASTATES_OPERATOR_STATE_NAME,
+					new NFA.MetaStates.MetaStatesSerializer<IN>()));
+		}
+
 		if (nfaOperatorState == null) {
+			NFA.MetaStates<IN> metaStates = getNFAMetaStates();
+			if (metaStates == null) {
+				metaStates = nfaFactory.createNFA().getMetaStates();
+				updateNFAMetaStates(metaStates);
+			}
+
 			nfaOperatorState = getRuntimeContext().getState(
 				new ValueStateDescriptor<>(
-						NFA_OPERATOR_STATE_NAME,
-						new NFA.NFASerializer<>(inputSerializer)));
+					NFA_OPERATOR_STATE_NAME,
+					new NFA.NFASerializerV2<>(inputSerializer, metaStates)));
 		}
 
 		if (elementQueueState == null) {
@@ -264,6 +281,20 @@ private void updateLastSeenWatermark(long timestamp) {
 		this.lastWatermark = timestamp;
 	}
 
+	private NFA.MetaStates<IN> getNFAMetaStates() throws Exception {
+		Iterable<NFA.MetaStates<IN>> listState = nfaMetaStatesOperatorState.get();
+		if (listState != null && listState.iterator().hasNext()) {
+			return listState.iterator().next();
+		} else {
+			return null;
+		}
+	}
+
+	private void updateNFAMetaStates(NFA.MetaStates<IN> metaStates) throws Exception {
+		nfaMetaStatesOperatorState.clear();
+		nfaMetaStatesOperatorState.add(metaStates);
+	}
+
 	private NFA<IN> getNFA() throws IOException {
 		NFA<IN> nfa = nfaOperatorState.value();
 		return nfa != null ? nfa : nfaFactory.createNFA();
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 25863423fbc..ace97d6ce88 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -299,7 +299,8 @@ public boolean filter(Event value) throws Exception {
 			nfa.process(d, 7);
 			nfa.process(a, 8);
 
-			NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFA.NFASerializerV2<Event> serializer =
+				new NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
 
 			//serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -307,7 +308,8 @@ public boolean filter(Event value) throws Exception {
 			baos.close();
 
 			// copy
-			NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFA.NFASerializerV2<Event> copySerializer =
+				new NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
 			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
 			ByteArrayOutputStream out = new ByteArrayOutputStream();
 			copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
@@ -316,7 +318,8 @@ public boolean filter(Event value) throws Exception {
 
 			// deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-			NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFA.NFASerializerV2<Event> deserializer =
+				new NFA.NFASerializerV2<>(Event.createTypeSerializer(), nfa.getMetaStates());
 			NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais));
 			bais.close();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services