You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/13 11:50:55 UTC

flink git commit: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

Repository: flink
Updated Branches:
  refs/heads/master 57333c622 -> ff9cefb36


[FLINK-7835][cep] Fix duplicate() in NFASerializer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff9cefb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff9cefb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff9cefb3

Branch: refs/heads/master
Commit: ff9cefb36c70a9b6c55f607fc2b56644c57f7057
Parents: 57333c6
Author: kkloudas <kk...@gmail.com>
Authored: Thu Oct 12 15:20:32 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Oct 13 13:49:58 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/NonDuplicatingTypeSerializer.java |   5 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 154 +------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  62 ++++----
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   5 +-
 .../apache/flink/cep/nfa/SharedBufferTest.java  |   2 +-
 .../flink/cep/operator/CEPOperatorTest.java     |  11 ++
 6 files changed, 57 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index f9e13fe..0978aa1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
+import java.util.List;
 
 /**
  * Type serializer which keeps track of the serialized objects so that each object is only
@@ -53,7 +54,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 	private transient IdentityHashMap<T, Integer> identityMap;
 
 	// here we store the already deserialized objects
-	private transient ArrayList<T> elementList;
+	private transient List<T> elementList;
 
 	public NonDuplicatingTypeSerializer(final TypeSerializer<T> typeSerializer) {
 		this.typeSerializer = typeSerializer;
@@ -82,7 +83,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public TypeSerializer<T> duplicate() {
-		return new NonDuplicatingTypeSerializer<>(typeSerializer);
+		return new NonDuplicatingTypeSerializer<>(typeSerializer.duplicate());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ff4967f..7092d73 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,9 +28,7 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
@@ -48,7 +46,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OptionalDataException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -489,7 +486,6 @@ public class NFA<T> implements Serializable {
 		}
 	}
 
-
 	/**
 	 * Computes the next computation states based on the given computation state, the current event,
 	 * its timestamp and the internal state machine. The algorithm is:
@@ -793,53 +789,6 @@ public class NFA<T> implements Serializable {
 		return result;
 	}
 
-	//////////////////////			Fault-Tolerance			//////////////////////
-
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		ois.defaultReadObject();
-
-		int numberComputationStates = ois.readInt();
-
-		computationStates = new LinkedList<>();
-
-		final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
-
-		for (int i = 0; i < numberComputationStates; i++) {
-			ComputationState<T> computationState = readComputationState(ois);
-			readComputationStates.add(computationState);
-		}
-
-		this.computationStates.addAll(readComputationStates);
-		nonDuplicatingTypeSerializer.clearReferences();
-	}
-
-	@SuppressWarnings("unchecked")
-	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		final State<T> state = (State<T>) ois.readObject();
-		State<T> previousState;
-		try {
-			previousState = (State<T>) ois.readObject();
-		} catch (OptionalDataException e) {
-			previousState = null;
-		}
-
-		final long timestamp = ois.readLong();
-		final DeweyNumber version = (DeweyNumber) ois.readObject();
-		final long startTimestamp = ois.readLong();
-
-		final boolean hasEvent = ois.readBoolean();
-		final T event;
-
-		if (hasEvent) {
-			DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
-			event = nonDuplicatingTypeSerializer.deserialize(input);
-		} else {
-			event = null;
-		}
-
-		return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp);
-	}
-
 	//////////////////////			New Serialization			//////////////////////
 
 	/**
@@ -893,8 +842,8 @@ public class NFA<T> implements Serializable {
 		}
 
 		@Override
-		public TypeSerializer<NFA<T>> duplicate() {
-			return this;
+		public NFASerializer<T> duplicate() {
+			return new NFASerializer<>(eventSerializer.duplicate());
 		}
 
 		@Override
@@ -906,21 +855,13 @@ public class NFA<T> implements Serializable {
 		public NFA<T> copy(NFA<T> from) {
 			try {
 				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-				serialize(from, new DataOutputViewStreamWrapper(oos));
-
-				oos.close();
+				serialize(from, new DataOutputViewStreamWrapper(baos));
 				baos.close();
 
 				byte[] data = baos.toByteArray();
 
 				ByteArrayInputStream bais = new ByteArrayInputStream(data);
-				ObjectInputStream ois = new ObjectInputStream(bais);
-
-				@SuppressWarnings("unchecked")
-				NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois));
-				ois.close();
+				NFA<T> copy = deserialize(new DataInputViewStreamWrapper(bais));
 				bais.close();
 				return copy;
 			} catch (IOException e) {
@@ -1236,91 +1177,4 @@ public class NFA<T> implements Serializable {
 			return null;
 		}
 	}
-
-	//////////////////			Old Serialization			//////////////////////
-
-	/**
-	 * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
-	 */
-	public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public NFA<T> createInstance() {
-			return null;
-		}
-
-		@Override
-		public NFA<T> copy(NFA<T> from) {
-			try {
-				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-				oos.writeObject(from);
-
-				oos.close();
-				baos.close();
-
-				byte[] data = baos.toByteArray();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(data);
-				ObjectInputStream ois = new ObjectInputStream(bais);
-
-				@SuppressWarnings("unchecked")
-				NFA<T> copy = (NFA<T>) ois.readObject();
-				ois.close();
-				bais.close();
-				return copy;
-			} catch (IOException | ClassNotFoundException e) {
-				throw new RuntimeException("Could not copy NFA.", e);
-			}
-		}
-
-		@Override
-		public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
-			throw new UnsupportedOperationException("This is the deprecated serialization strategy.");
-		}
-
-		@Override
-		public NFA<T> deserialize(DataInputView source) throws IOException {
-			try (ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source))) {
-				return (NFA<T>) ois.readObject();
-			} catch (ClassNotFoundException e) {
-				throw new RuntimeException("Could not deserialize NFA.", e);
-			}
-		}
-
-		@Override
-		public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			int size = source.readInt();
-			target.writeInt(size);
-			target.write(source, size);
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof Serializer;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 6bc5091..0cf47ca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -38,8 +38,6 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -829,40 +827,44 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			this.versionSerializer = versionSerializer;
 		}
 
+		public TypeSerializer<DeweyNumber> getVersionSerializer() {
+			return versionSerializer;
+		}
+
+		public TypeSerializer<K> getKeySerializer() {
+			return keySerializer;
+		}
+
+		public TypeSerializer<V> getValueSerializer() {
+			return valueSerializer;
+		}
+
 		@Override
 		public boolean isImmutableType() {
 			return false;
 		}
 
 		@Override
-		public TypeSerializer<SharedBuffer<K, V>> duplicate() {
-			return new SharedBufferSerializer<>(keySerializer, valueSerializer);
+		public SharedBufferSerializer<K, V> duplicate() {
+			return new SharedBufferSerializer<>(keySerializer.duplicate(), valueSerializer.duplicate());
 		}
 
 		@Override
 		public SharedBuffer<K, V> createInstance() {
-			return new SharedBuffer<>(new NonDuplicatingTypeSerializer<V>(valueSerializer));
+			return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer.duplicate()));
 		}
 
 		@Override
-		public SharedBuffer<K, V> copy(SharedBuffer from) {
+		public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) {
 			try {
 				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-				serialize(from, new DataOutputViewStreamWrapper(oos));
-
-				oos.close();
+				serialize(from, new DataOutputViewStreamWrapper(baos));
 				baos.close();
 
 				byte[] data = baos.toByteArray();
 
 				ByteArrayInputStream bais = new ByteArrayInputStream(data);
-				ObjectInputStream ois = new ObjectInputStream(bais);
-
-				@SuppressWarnings("unchecked")
-				SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(ois));
-				ois.close();
+				SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(bais));
 				bais.close();
 
 				return copy;
@@ -872,7 +874,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 
 		@Override
-		public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer reuse) {
+		public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, SharedBuffer<K, V> reuse) {
 			return copy(from);
 		}
 
@@ -882,7 +884,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 
 		@Override
-		public void serialize(SharedBuffer record, DataOutputView target) throws IOException {
+		public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
 			Map<K, SharedBufferPage<K, V>> pages = record.pages;
 			Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
 
@@ -955,7 +957,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		}
 
 		@Override
-		public SharedBuffer deserialize(DataInputView source) throws IOException {
+		public SharedBuffer<K, V> deserialize(DataInputView source) throws IOException {
 			List<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
 			Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
 
@@ -1013,11 +1015,11 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			// here we put the old NonDuplicating serializer because this needs to create a copy
 			// of the buffer, as created by the NFA. There, for compatibility reasons, we have left
 			// the old serializer.
-			return new SharedBuffer(new NonDuplicatingTypeSerializer(valueSerializer), pages);
+			return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer), pages);
 		}
 
 		@Override
-		public SharedBuffer deserialize(SharedBuffer reuse, DataInputView source) throws IOException {
+		public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, DataInputView source) throws IOException {
 			return deserialize(source);
 		}
 
@@ -1068,11 +1070,19 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 		@Override
 		public boolean equals(Object obj) {
-			return obj == this ||
-					(obj != null && obj.getClass().equals(getClass()) &&
-							keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) &&
-							valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) &&
-							versionSerializer.equals(((SharedBufferSerializer<?, ?>) obj).versionSerializer));
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj == null || !Objects.equals(obj.getClass(), getClass())) {
+				return false;
+			}
+
+			SharedBufferSerializer other = (SharedBufferSerializer) obj;
+			return
+					Objects.equals(keySerializer, other.getKeySerializer()) &&
+					Objects.equals(valueSerializer, other.getValueSerializer()) &&
+					Objects.equals(versionSerializer, other.getVersionSerializer());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 8e739c3..0f4066f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -310,14 +310,13 @@ public class NFATest extends TestLogger {
 			NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
 			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
 			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+			copySerializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
 			in.close();
 			out.close();
 
 			// deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-			NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
-			NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais));
+			NFA<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
 			bais.close();
 
 			assertEquals(nfa, copy);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index dfbfa5f..51d27e0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -160,7 +160,7 @@ public class SharedBufferTest extends TestLogger {
 		serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos));
 
 		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		SharedBuffer<String, Event> copy = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+		SharedBuffer<String, Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
 
 		assertEquals(sharedBuffer, copy);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index ed8b923..4fa6d09 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -259,12 +259,16 @@ public class CEPOperatorTest extends TestLogger {
 					null,
 					null,
 					new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+						private static final long serialVersionUID = -5768297287711394420L;
+
 						@Override
 						public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
 							return pattern;
 						}
 					},
 					new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() {
+						private static final long serialVersionUID = 2843329425823093249L;
+
 						@Override
 						public Tuple2<Map<String, List<Event>>, Long> timeout(
 							Map<String, List<Event>> pattern,
@@ -274,6 +278,8 @@ public class CEPOperatorTest extends TestLogger {
 					},
 					timedOut
 				), new KeySelector<Event, Integer>() {
+				private static final long serialVersionUID = 7219185117566268366L;
+
 				@Override
 				public Integer getKey(Event value) throws Exception {
 					return value.getId();
@@ -281,6 +287,11 @@ public class CEPOperatorTest extends TestLogger {
 			}, BasicTypeInfo.INT_TYPE_INFO);
 
 		try {
+			String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+			RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+			rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+			harness.setStateBackend(rocksDBStateBackend);
 			harness.setup(
 				new KryoSerializer<>(
 					(Class<Map<String, List<Event>>>) (Object) Map.class,