You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[44/50] [abbrv] git commit: Get serialization of events off main processing thread

Get serialization of events off main processing thread


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/49dc481a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/49dc481a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/49dc481a

Branch: refs/heads/dev
Commit: 49dc481a73afea8f69d6b9e429d253ae85b5468e
Parents: cf685ab
Author: Bruce Robbins <ro...@thosespent.corp.yahoo.com>
Authored: Thu Nov 17 21:46:11 2011 -0800
Committer: Bruce Robbins <ro...@thosespent.corp.yahoo.com>
Committed: Thu Nov 17 21:46:11 2011 -0800

----------------------------------------------------------------------
 .../main/java/io/s4/emitter/CommLayerEmitter.java  |   17 +++++++--------
 1 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dc481a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java b/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
index 4a8b8cf..8318b78 100644
--- a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
+++ b/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
@@ -115,8 +115,7 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
         }
 
         try {
-            byte[] rawMessage = serDeser.serialize(eventWrapper);
-            MessageHolder mh = new MessageHolder(partitionId, rawMessage);
+        	MessageHolder mh = new MessageHolder(partitionId, eventWrapper);            
             queueMessage(mh);
         } catch (RuntimeException rte) {
             if (monitor != null) {
@@ -197,7 +196,7 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
             isSent = false;
             try {
                 MessageHolder mh = messageQueue.take();
-                byte[] rawMessage = mh.getRawMessage();
+                byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
                 if (listener == null) {
                     isSent = sender.send(rawMessage);
                 } else {
@@ -244,19 +243,19 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
 
     class MessageHolder {
         private int partitionId;
-        private byte[] rawMessage;
-
-        MessageHolder(int partitionId, byte[] rawMessage) {
+        private EventWrapper eventWrapper;
+        
+        MessageHolder(int partitionId, EventWrapper eventWrapper) {
             this.partitionId = partitionId;
-            this.rawMessage = rawMessage;
+            this.eventWrapper = eventWrapper;
         }
 
         int getPartitionId() {
             return partitionId;
         }
 
-        byte[] getRawMessage() {
-            return rawMessage;
+        EventWrapper getEventWrapper() {
+            return eventWrapper;
         }
     }
 }