You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/12/29 12:55:05 UTC
flink git commit: [FLINK-8227] Optimize the performance of
SharedBufferSerializer
Repository: flink
Updated Branches:
refs/heads/master 91f00ec91 -> c4acbb838
[FLINK-8227] Optimize the performance of SharedBufferSerializer
This closes #5142
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4acbb83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4acbb83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4acbb83
Branch: refs/heads/master
Commit: c4acbb838ffc9582436f976bfb7eaff838a0ed87
Parents: 91f00ec
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Sat Dec 9 11:51:04 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Fri Dec 29 13:52:37 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/cep/nfa/SharedBuffer.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c4acbb83/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 0cf47ca..29e8dc2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -527,6 +527,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
private final Set<SharedBufferEdge<K, V>> edges;
private final SharedBufferPage<K, V> page;
private int referenceCounter;
+ private transient int entryId;
SharedBufferEntry(
final ValueTimeWrapper<V> valueTime,
@@ -547,6 +548,8 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
referenceCounter = 0;
+ entryId = -1;
+
this.page = page;
}
@@ -886,7 +889,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
@Override
public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
Map<K, SharedBufferPage<K, V>> pages = record.pages;
- Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
int totalEdges = 0;
int entryCounter = 0;
@@ -908,7 +910,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
// assign id to the sharedBufferEntry for the future
// serialization of the previous relation
- entryIDs.put(sharedBuffer, entryCounter++);
+ sharedBuffer.entryId = entryCounter++;
ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
@@ -932,15 +934,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
- Integer id = entryIDs.get(sharedBuffer);
- Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer);
+ int id = sharedBuffer.entryId;
+ Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
// in order to serialize the previous relation we simply serialize the ids
// of the source and target SharedBufferEntry
if (edge.target != null) {
- Integer targetId = entryIDs.get(edge.getTarget());
- Preconditions.checkState(targetId != null,
+ int targetId = edge.getTarget().entryId;
+ Preconditions.checkState(targetId != -1,
"Could not find id for entry: " + edge.getTarget());
target.writeInt(id);