You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[44/50] [abbrv] git commit: Get serialization of events off main
processing thread
Get serialization of events off main processing thread
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/49dc481a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/49dc481a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/49dc481a
Branch: refs/heads/dev
Commit: 49dc481a73afea8f69d6b9e429d253ae85b5468e
Parents: cf685ab
Author: Bruce Robbins <ro...@thosespent.corp.yahoo.com>
Authored: Thu Nov 17 21:46:11 2011 -0800
Committer: Bruce Robbins <ro...@thosespent.corp.yahoo.com>
Committed: Thu Nov 17 21:46:11 2011 -0800
----------------------------------------------------------------------
.../main/java/io/s4/emitter/CommLayerEmitter.java | 17 +++++++--------
1 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/49dc481a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java b/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
index 4a8b8cf..8318b78 100644
--- a/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
+++ b/s4-core/src/main/java/io/s4/emitter/CommLayerEmitter.java
@@ -115,8 +115,7 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
}
try {
- byte[] rawMessage = serDeser.serialize(eventWrapper);
- MessageHolder mh = new MessageHolder(partitionId, rawMessage);
+ MessageHolder mh = new MessageHolder(partitionId, eventWrapper);
queueMessage(mh);
} catch (RuntimeException rte) {
if (monitor != null) {
@@ -197,7 +196,7 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
isSent = false;
try {
MessageHolder mh = messageQueue.take();
- byte[] rawMessage = mh.getRawMessage();
+ byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
if (listener == null) {
isSent = sender.send(rawMessage);
} else {
@@ -244,19 +243,19 @@ public class CommLayerEmitter implements EventEmitter, Runnable {
class MessageHolder {
private int partitionId;
- private byte[] rawMessage;
-
- MessageHolder(int partitionId, byte[] rawMessage) {
+ private EventWrapper eventWrapper;
+
+ MessageHolder(int partitionId, EventWrapper eventWrapper) {
this.partitionId = partitionId;
- this.rawMessage = rawMessage;
+ this.eventWrapper = eventWrapper;
}
int getPartitionId() {
return partitionId;
}
- byte[] getRawMessage() {
- return rawMessage;
+ EventWrapper getEventWrapper() {
+ return eventWrapper;
}
}
}