You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/12/29 12:55:05 UTC

flink git commit: [FLINK-8227] Optimize the performance of SharedBufferSerializer

Repository: flink
Updated Branches:
  refs/heads/master 91f00ec91 -> c4acbb838


[FLINK-8227] Optimize the performance of SharedBufferSerializer

This closes #5142


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4acbb83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4acbb83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4acbb83

Branch: refs/heads/master
Commit: c4acbb838ffc9582436f976bfb7eaff838a0ed87
Parents: 91f00ec
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Sat Dec 9 11:51:04 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Fri Dec 29 13:52:37 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/cep/nfa/SharedBuffer.java   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4acbb83/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 0cf47ca..29e8dc2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -527,6 +527,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		private final Set<SharedBufferEdge<K, V>> edges;
 		private final SharedBufferPage<K, V> page;
 		private int referenceCounter;
+		private transient int entryId;
 
 		SharedBufferEntry(
 				final ValueTimeWrapper<V> valueTime,
@@ -547,6 +548,8 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 			referenceCounter = 0;
 
+			entryId = -1;
+
 			this.page = page;
 		}
 
@@ -886,7 +889,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		@Override
 		public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
 			Map<K, SharedBufferPage<K, V>> pages = record.pages;
-			Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>();
 
 			int totalEdges = 0;
 			int entryCounter = 0;
@@ -908,7 +910,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 					// assign id to the sharedBufferEntry for the future
 					// serialization of the previous relation
-					entryIDs.put(sharedBuffer, entryCounter++);
+					sharedBuffer.entryId = entryCounter++;
 
 					ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
 
@@ -932,15 +934,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 				for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) {
 					SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
 
-					Integer id = entryIDs.get(sharedBuffer);
-					Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer);
+					int id = sharedBuffer.entryId;
+					Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
 
 					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
 						// in order to serialize the previous relation we simply serialize the ids
 						// of the source and target SharedBufferEntry
 						if (edge.target != null) {
-							Integer targetId = entryIDs.get(edge.getTarget());
-							Preconditions.checkState(targetId != null,
+							int targetId = edge.getTarget().entryId;
+							Preconditions.checkState(targetId != -1,
 									"Could not find id for entry: " + edge.getTarget());
 
 							target.writeInt(id);