You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:23:42 UTC
svn commit: r904929 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming:
StreamCompletionHandler.java StreamFinishedVerbHandler.java
StreamInManager.java
Author: jbellis
Date: Sun Jan 31 00:23:42 2010
New Revision: 904929
URL: http://svn.apache.org/viewvc?rev=904929&view=rev
Log:
r/m unneeded StreamStatusMessage wrapper
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:23:42 2010
@@ -51,10 +51,8 @@
if (logger.isDebugEnabled())
logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
- /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
- StreamInManager.StreamStatusMessage streamStatusMessage = new StreamInManager.StreamStatusMessage(streamStatus);
- Message message = StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.instance.sendOneWay(message, host);
+ /* Send a StreamStatus message which may require the source node to re-stream certain files. */
+ MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
/* If we're done with everything for this host, remove from bootstrap sources */
if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:23:42 2010
@@ -23,8 +23,7 @@
try
{
- StreamInManager.StreamStatusMessage streamStatusMessage = StreamInManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
- StreamInManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+ StreamInManager.StreamStatus streamStatus = StreamInManager.StreamStatus.serializer().deserialize(new DataInputStream(bufIn));
switch (streamStatus.getAction())
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904929&r1=904928&r2=904929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:23:42 2010
@@ -87,6 +87,14 @@
{
return action_;
}
+
+ public Message makeStreamStatusMessage() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ StreamStatus.serializer().serialize(this, dos);
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+ }
}
public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
@@ -117,56 +125,7 @@
return streamStatus;
}
}
-
- public static class StreamStatusMessage
- {
- private static ICompactSerializer<StreamStatusMessage> serializer_;
-
- static
- {
- serializer_ = new StreamStatusMessageSerializer();
- }
-
- public static ICompactSerializer<StreamStatusMessage> serializer()
- {
- return serializer_;
- }
-
- public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
- }
-
- protected StreamInManager.StreamStatus streamStatus_;
-
- public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
- {
- streamStatus_ = streamStatus;
- }
-
- public StreamInManager.StreamStatus getStreamStatus()
- {
- return streamStatus_;
- }
- }
-
- public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
- {
- public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
- {
- StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);
- }
-
- public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
- {
- StreamInManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
- return new StreamStatusMessage(streamStatus);
- }
- }
-
+
/* Maintain a stream context per host that is the source of the stream */
public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
/* Maintain in this map the status of the streams that need to be sent back to the source */