You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:24:12 UTC
svn commit: r904930 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: net/io/ streaming/
Author: jbellis
Date: Sun Jan 31 00:24:12 2010
New Revision: 904930
URL: http://svn.apache.org/viewvc?rev=904930&view=rev
Log:
extract SIM.StreamStatus to CompletedFileStatus
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Sun Jan 31 00:24:12 2010
@@ -27,6 +27,7 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.net.FileStreamTask;
+import org.apache.cassandra.streaming.CompletedFileStatus;
import org.apache.cassandra.streaming.IStreamComplete;
import org.apache.cassandra.streaming.InitiatedFile;
import org.apache.cassandra.streaming.StreamInManager;
@@ -35,7 +36,7 @@
{
private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
private InitiatedFile initiatedFile;
- private StreamInManager.StreamStatus streamStatus;
+ private CompletedFileStatus streamStatus;
private SocketChannel socketChannel;
public IncomingStreamReader(SocketChannel socketChannel)
@@ -65,7 +66,7 @@
catch (IOException ex)
{
/* Ask the source node to re-stream this file. */
- streamStatus.setAction(StreamInManager.StreamCompletionAction.STREAM);
+ streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
handleStreamCompletion(remoteAddress.getAddress());
/* Delete the orphaned file. */
File file = new File(initiatedFile.getTargetFile());
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904930&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Sun Jan 31 00:24:12 2010
@@ -0,0 +1,100 @@
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CompletedFileStatus
+{
+ private static ICompactSerializer<CompletedFileStatus> serializer_;
+
+ public static enum StreamCompletionAction
+ {
+ DELETE,
+ STREAM
+ }
+
+ static
+ {
+ serializer_ = new CompletedFileStatusSerializer();
+ }
+
+ public static ICompactSerializer<CompletedFileStatus> serializer()
+ {
+ return serializer_;
+ }
+
+ private String file_;
+ private long expectedBytes_;
+ private StreamCompletionAction action_;
+
+ public CompletedFileStatus(String file, long expectedBytes)
+ {
+ file_ = file;
+ expectedBytes_ = expectedBytes;
+ action_ = StreamCompletionAction.DELETE;
+ }
+
+ public String getFile()
+ {
+ return file_;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public void setAction(StreamCompletionAction action)
+ {
+ action_ = action;
+ }
+
+ public StreamCompletionAction getAction()
+ {
+ return action_;
+ }
+
+ public Message makeStreamStatusMessage() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ CompletedFileStatus.serializer().serialize(this, dos);
+ return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
+ }
+
+ private static class CompletedFileStatusSerializer implements ICompactSerializer<CompletedFileStatus>
+ {
+ public void serialize(CompletedFileStatus streamStatus, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(streamStatus.getFile());
+ dos.writeLong(streamStatus.getExpectedBytes());
+ dos.writeInt(streamStatus.getAction().ordinal());
+ }
+
+ public CompletedFileStatus deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ CompletedFileStatus streamStatus = new CompletedFileStatus(targetFile, expectedBytes);
+
+ int ordinal = dis.readInt();
+ if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.DELETE);
+ }
+ else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.STREAM);
+ }
+
+ return streamStatus;
+ }
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:24:12 2010
@@ -22,13 +22,11 @@
import java.net.InetAddress;
-import org.apache.cassandra.streaming.StreamInManager;
-
public interface IStreamComplete
{
/*
* This callback if registered with the StreamContextManager is
* called when the stream from a host is completely handled.
*/
- public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException;
+ public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:24:12 2010
@@ -9,7 +9,6 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.IStreamComplete;
import org.apache.cassandra.streaming.StreamInManager;
@@ -25,7 +24,7 @@
{
private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
- public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException
+ public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus) throws IOException
{
/* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
if (initiatedFile.getTargetFile().contains("-Data.db"))
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:24:12 2010
@@ -9,7 +9,6 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.streaming.StreamOutManager;
public class StreamFinishedVerbHandler implements IVerbHandler
@@ -23,7 +22,7 @@
try
{
- StreamInManager.StreamStatus streamStatus = StreamInManager.StreamStatus.serializer().deserialize(new DataInputStream(bufIn));
+ CompletedFileStatus streamStatus = CompletedFileStatus.serializer().deserialize(new DataInputStream(bufIn));
switch (streamStatus.getAction())
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:24:12 2010
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -26,110 +25,18 @@
import java.net.InetAddress;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;
public class StreamInManager
{
private static final Logger logger = Logger.getLogger(StreamInManager.class);
-
- public static enum StreamCompletionAction
- {
- DELETE,
- STREAM
- }
-
- public static class StreamStatus
- {
- private static ICompactSerializer<StreamStatus> serializer_;
-
- static
- {
- serializer_ = new StreamStatusSerializer();
- }
-
- public static ICompactSerializer<StreamStatus> serializer()
- {
- return serializer_;
- }
-
- private String file_;
- private long expectedBytes_;
- private StreamCompletionAction action_;
-
- public StreamStatus(String file, long expectedBytes)
- {
- file_ = file;
- expectedBytes_ = expectedBytes;
- action_ = StreamInManager.StreamCompletionAction.DELETE;
- }
-
- public String getFile()
- {
- return file_;
- }
-
- public long getExpectedBytes()
- {
- return expectedBytes_;
- }
-
- public void setAction(StreamInManager.StreamCompletionAction action)
- {
- action_ = action;
- }
-
- public StreamInManager.StreamCompletionAction getAction()
- {
- return action_;
- }
- public Message makeStreamStatusMessage() throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- StreamStatus.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
- }
- }
-
- public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
- {
- public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(streamStatus.getFile());
- dos.writeLong(streamStatus.getExpectedBytes());
- dos.writeInt(streamStatus.getAction().ordinal());
- }
-
- public StreamStatus deserialize(DataInputStream dis) throws IOException
- {
- String targetFile = dis.readUTF();
- long expectedBytes = dis.readLong();
- StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
-
- int ordinal = dis.readInt();
- if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.DELETE);
- }
- else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
- {
- streamStatus.setAction(StreamCompletionAction.STREAM);
- }
-
- return streamStatus;
- }
- }
-
/* Maintain a stream context per host that is the source of the stream */
public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
/* Maintain in this map the status of the streams that need to be sent back to the source */
- public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
+ public static final Map<InetAddress, List<CompletedFileStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<CompletedFileStatus>>();
/* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
@@ -144,12 +51,12 @@
return initiatedFile;
}
- public synchronized static StreamStatus getStreamStatus(InetAddress key)
+ public synchronized static CompletedFileStatus getStreamStatus(InetAddress key)
{
- List<StreamStatus> status = streamStatusBag_.get(key);
+ List<CompletedFileStatus> status = streamStatusBag_.get(key);
if ( status == null )
throw new IllegalStateException("Streaming status has not been set for " + key);
- StreamStatus streamStatus = status.remove(0);
+ CompletedFileStatus streamStatus = status.remove(0);
if ( status.isEmpty() )
streamStatusBag_.remove(key);
return streamStatus;
@@ -179,7 +86,7 @@
streamNotificationHandlers_.put(key, streamComplete);
}
- public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, StreamStatus streamStatus)
+ public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
{
/* Record the stream context */
List<InitiatedFile> context = ctxBag_.get(key);
@@ -191,10 +98,10 @@
context.add(initiatedFile);
/* Record the stream status for this stream context */
- List<StreamStatus> status = streamStatusBag_.get(key);
+ List<CompletedFileStatus> status = streamStatusBag_.get(key);
if ( status == null )
{
- status = new ArrayList<StreamStatus>();
+ status = new ArrayList<CompletedFileStatus>();
streamStatusBag_.put(key, status);
}
status.add( streamStatus );
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904930&r1=904929&r2=904930&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:24:12 2010
@@ -63,7 +63,7 @@
*/
for (InitiatedFile initiatedFile : initiatedFiles)
{
- StreamInManager.StreamStatus streamStatus = new StreamInManager.StreamStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
+ CompletedFileStatus streamStatus = new CompletedFileStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, initiatedFile);
if (logger.isDebugEnabled())
@@ -136,7 +136,7 @@
return fileNames;
}
- private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus)
+ private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, CompletedFileStatus streamStatus)
{
if (logger.isDebugEnabled())
logger.debug("Adding stream context " + initiatedFile + " for " + host + " ...");