You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/13 11:50:55 UTC
flink git commit: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Repository: flink
Updated Branches:
refs/heads/master 57333c622 -> ff9cefb36
[FLINK-7835][cep] Fix duplicate() in NFASerializer.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff9cefb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff9cefb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff9cefb3
Branch: refs/heads/master
Commit: ff9cefb36c70a9b6c55f607fc2b56644c57f7057
Parents: 57333c6
Author: kkloudas <kk...@gmail.com>
Authored: Thu Oct 12 15:20:32 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Oct 13 13:49:58 2017 +0200
----------------------------------------------------------------------
.../flink/cep/NonDuplicatingTypeSerializer.java | 5 +-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 154 +------------------
.../org/apache/flink/cep/nfa/SharedBuffer.java | 62 ++++----
.../java/org/apache/flink/cep/nfa/NFATest.java | 5 +-
.../apache/flink/cep/nfa/SharedBufferTest.java | 2 +-
.../flink/cep/operator/CEPOperatorTest.java | 11 ++
6 files changed, 57 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index f9e13fe..0978aa1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.IdentityHashMap;
+import java.util.List;
/**
* Type serializer which keeps track of the serialized objects so that each object is only
@@ -53,7 +54,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
private transient IdentityHashMap<T, Integer> identityMap;
// here we store the already deserialized objects
- private transient ArrayList<T> elementList;
+ private transient List<T> elementList;
public NonDuplicatingTypeSerializer(final TypeSerializer<T> typeSerializer) {
this.typeSerializer = typeSerializer;
@@ -82,7 +83,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
@Override
public TypeSerializer<T> duplicate() {
- return new NonDuplicatingTypeSerializer<>(typeSerializer);
+ return new NonDuplicatingTypeSerializer<>(typeSerializer.duplicate());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ff4967f..7092d73 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,9 +28,7 @@ import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.EnumSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.cep.NonDuplicatingTypeSerializer;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
@@ -48,7 +46,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OptionalDataException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -489,7 +486,6 @@ public class NFA<T> implements Serializable {
}
}
-
/**
* Computes the next computation states based on the given computation state, the current event,
* its timestamp and the internal state machine. The algorithm is:
@@ -793,53 +789,6 @@ public class NFA<T> implements Serializable {
return result;
}
- ////////////////////// Fault-Tolerance //////////////////////
-
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- ois.defaultReadObject();
-
- int numberComputationStates = ois.readInt();
-
- computationStates = new LinkedList<>();
-
- final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
-
- for (int i = 0; i < numberComputationStates; i++) {
- ComputationState<T> computationState = readComputationState(ois);
- readComputationStates.add(computationState);
- }
-
- this.computationStates.addAll(readComputationStates);
- nonDuplicatingTypeSerializer.clearReferences();
- }
-
- @SuppressWarnings("unchecked")
- private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- final State<T> state = (State<T>) ois.readObject();
- State<T> previousState;
- try {
- previousState = (State<T>) ois.readObject();
- } catch (OptionalDataException e) {
- previousState = null;
- }
-
- final long timestamp = ois.readLong();
- final DeweyNumber version = (DeweyNumber) ois.readObject();
- final long startTimestamp = ois.readLong();
-
- final boolean hasEvent = ois.readBoolean();
- final T event;
-
- if (hasEvent) {
- DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
- event = nonDuplicatingTypeSerializer.deserialize(input);
- } else {
- event = null;
- }
-
- return ComputationState.createState(this, state, previousState, event, 0, timestamp, version, startTimestamp);
- }
-
////////////////////// New Serialization //////////////////////
/**
@@ -893,8 +842,8 @@ public class NFA<T> implements Serializable {
}
@Override
- public TypeSerializer<NFA<T>> duplicate() {
- return this;
+ public NFASerializer<T> duplicate() {
+ return new NFASerializer<>(eventSerializer.duplicate());
}
@Override
@@ -906,21 +855,13 @@ public class NFA<T> implements Serializable {
public NFA<T> copy(NFA<T> from) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
-
- serialize(from, new DataOutputViewStreamWrapper(oos));
-
- oos.close();
+ serialize(from, new DataOutputViewStreamWrapper(baos));
baos.close();
byte[] data = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- @SuppressWarnings("unchecked")
- NFA<T> copy = deserialize(new DataInputViewStreamWrapper(ois));
- ois.close();
+ NFA<T> copy = deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
return copy;
} catch (IOException e) {
@@ -1236,91 +1177,4 @@ public class NFA<T> implements Serializable {
return null;
}
}
-
- ////////////////// Old Serialization //////////////////////
-
- /**
- * A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
- */
- public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public NFA<T> createInstance() {
- return null;
- }
-
- @Override
- public NFA<T> copy(NFA<T> from) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
-
- oos.writeObject(from);
-
- oos.close();
- baos.close();
-
- byte[] data = baos.toByteArray();
-
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- @SuppressWarnings("unchecked")
- NFA<T> copy = (NFA<T>) ois.readObject();
- ois.close();
- bais.close();
- return copy;
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException("Could not copy NFA.", e);
- }
- }
-
- @Override
- public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return 0;
- }
-
- @Override
- public void serialize(NFA<T> record, DataOutputView target) throws IOException {
- throw new UnsupportedOperationException("This is the deprecated serialization strategy.");
- }
-
- @Override
- public NFA<T> deserialize(DataInputView source) throws IOException {
- try (ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source))) {
- return (NFA<T>) ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not deserialize NFA.", e);
- }
- }
-
- @Override
- public NFA<T> deserialize(NFA<T> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- int size = source.readInt();
- target.writeInt(size);
- target.write(source, size);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof Serializer;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 6bc5091..0cf47ca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -38,8 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -829,40 +827,44 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
this.versionSerializer = versionSerializer;
}
+ public TypeSerializer<DeweyNumber> getVersionSerializer() {
+ return versionSerializer;
+ }
+
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ public TypeSerializer<V> getValueSerializer() {
+ return valueSerializer;
+ }
+
@Override
public boolean isImmutableType() {
return false;
}
@Override
- public TypeSerializer<SharedBuffer<K, V>> duplicate() {
- return new SharedBufferSerializer<>(keySerializer, valueSerializer);
+ public SharedBufferSerializer<K, V> duplicate() {
+ return new SharedBufferSerializer<>(keySerializer.duplicate(), valueSerializer.duplicate());
}
@Override
public SharedBuffer<K, V> createInstance() {
- return new SharedBuffer<>(new NonDuplicatingTypeSerializer<V>(valueSerializer));
+ return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer.duplicate()));
}
@Override
- public SharedBuffer<K, V> copy(SharedBuffer from) {
+ public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
-
- serialize(from, new DataOutputViewStreamWrapper(oos));
-
- oos.close();
+ serialize(from, new DataOutputViewStreamWrapper(baos));
baos.close();
byte[] data = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- @SuppressWarnings("unchecked")
- SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(ois));
- ois.close();
+ SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
return copy;
@@ -872,7 +874,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
@Override
- public SharedBuffer<K, V> copy(SharedBuffer from, SharedBuffer reuse) {
+ public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, SharedBuffer<K, V> reuse) {
return copy(from);
}
@@ -882,7 +884,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
@Override
- public void serialize(SharedBuffer record, DataOutputView target) throws IOException {
+ public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
Map<K, SharedBufferPage<K, V>> pages = record.pages;
Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
@@ -955,7 +957,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
@Override
- public SharedBuffer deserialize(DataInputView source) throws IOException {
+ public SharedBuffer<K, V> deserialize(DataInputView source) throws IOException {
List<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
@@ -1013,11 +1015,11 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
// here we put the old NonDuplicating serializer because this needs to create a copy
// of the buffer, as created by the NFA. There, for compatibility reasons, we have left
// the old serializer.
- return new SharedBuffer(new NonDuplicatingTypeSerializer(valueSerializer), pages);
+ return new SharedBuffer<>(new NonDuplicatingTypeSerializer<>(valueSerializer), pages);
}
@Override
- public SharedBuffer deserialize(SharedBuffer reuse, DataInputView source) throws IOException {
+ public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@@ -1068,11 +1070,19 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
@Override
public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass().equals(getClass()) &&
- keySerializer.equals(((SharedBufferSerializer<?, ?>) obj).keySerializer) &&
- valueSerializer.equals(((SharedBufferSerializer<?, ?>) obj).valueSerializer) &&
- versionSerializer.equals(((SharedBufferSerializer<?, ?>) obj).versionSerializer));
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null || !Objects.equals(obj.getClass(), getClass())) {
+ return false;
+ }
+
+ SharedBufferSerializer other = (SharedBufferSerializer) obj;
+ return
+ Objects.equals(keySerializer, other.getKeySerializer()) &&
+ Objects.equals(valueSerializer, other.getValueSerializer()) &&
+ Objects.equals(versionSerializer, other.getVersionSerializer());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 8e739c3..0f4066f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -310,14 +310,13 @@ public class NFATest extends TestLogger {
NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
ByteArrayOutputStream out = new ByteArrayOutputStream();
- copySerializer.copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+ copySerializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
in.close();
out.close();
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
- NFA.NFASerializer<Event> deserializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
- NFA<Event> copy = deserializer.deserialize(new DataInputViewStreamWrapper(bais));
+ NFA<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
assertEquals(nfa, copy);
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index dfbfa5f..51d27e0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -160,7 +160,7 @@ public class SharedBufferTest extends TestLogger {
serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos));
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- SharedBuffer<String, Event> copy = serializer.deserialize(new DataInputViewStreamWrapper(bais));
+ SharedBuffer<String, Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
assertEquals(sharedBuffer, copy);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff9cefb3/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index ed8b923..4fa6d09 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -259,12 +259,16 @@ public class CEPOperatorTest extends TestLogger {
null,
null,
new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+ private static final long serialVersionUID = -5768297287711394420L;
+
@Override
public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
return pattern;
}
},
new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() {
+ private static final long serialVersionUID = 2843329425823093249L;
+
@Override
public Tuple2<Map<String, List<Event>>, Long> timeout(
Map<String, List<Event>> pattern,
@@ -274,6 +278,8 @@ public class CEPOperatorTest extends TestLogger {
},
timedOut
), new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = 7219185117566268366L;
+
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
@@ -281,6 +287,11 @@ public class CEPOperatorTest extends TestLogger {
}, BasicTypeInfo.INT_TYPE_INFO);
try {
+ String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
+ rocksDBStateBackend.setDbStoragePath(rocksDbPath);
+
+ harness.setStateBackend(rocksDBStateBackend);
harness.setup(
new KryoSerializer<>(
(Class<Map<String, List<Event>>>) (Object) Map.class,