You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:23:19 UTC
svn commit: r904927 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/io/
src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/dht/
Author: jbellis
Date: Sun Jan 31 00:23:19 2010
New Revision: 904927
URL: http://svn.apache.org/viewvc?rev=904927&view=rev
Log:
rename StreamContextManager -> StreamInManager, StreamManger -> StreamOutManager, extract SCM.StreamContext -> InitiatedFile
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java (with props)
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (contents, props changed)
- copied, changed from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (contents, props changed)
- copied, changed from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java Sun Jan 31 00:23:19 2010
@@ -25,7 +25,7 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,14 +51,14 @@
return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE, bos.toByteArray() );
}
- protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
+ protected InitiatedFile[] streamContexts_ = new InitiatedFile[0];
- public StreamInitiateMessage(StreamContextManager.StreamContext[] streamContexts)
+ public StreamInitiateMessage(InitiatedFile[] initiatedFiles)
{
- streamContexts_ = streamContexts;
+ streamContexts_ = initiatedFiles;
}
- public StreamContextManager.StreamContext[] getStreamContext()
+ public InitiatedFile[] getStreamContext()
{
return streamContexts_;
}
@@ -69,25 +69,25 @@
public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
{
dos.writeInt(bim.streamContexts_.length);
- for ( StreamContextManager.StreamContext streamContext : bim.streamContexts_ )
+ for ( InitiatedFile initiatedFile : bim.streamContexts_ )
{
- StreamContextManager.StreamContext.serializer().serialize(streamContext, dos);
+ InitiatedFile.serializer().serialize(initiatedFile, dos);
}
}
public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
{
int size = dis.readInt();
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[0];
+ InitiatedFile[] initiatedFiles = new InitiatedFile[0];
if ( size > 0 )
{
- streamContexts = new StreamContextManager.StreamContext[size];
+ initiatedFiles = new InitiatedFile[size];
for ( int i = 0; i < size; ++i )
{
- streamContexts[i] = StreamContextManager.StreamContext.serializer().deserialize(dis);
+ initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
}
}
- return new StreamInitiateMessage(streamContexts);
+ return new StreamInitiateMessage(initiatedFiles);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java Sun Jan 31 00:23:19 2010
@@ -28,22 +28,23 @@
import org.apache.cassandra.net.FileStreamTask;
import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
+import org.apache.cassandra.streaming.StreamInManager;
public class IncomingStreamReader
{
private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
- private StreamContextManager.StreamContext streamContext;
- private StreamContextManager.StreamStatus streamStatus;
+ private InitiatedFile initiatedFile;
+ private StreamInManager.StreamStatus streamStatus;
private SocketChannel socketChannel;
public IncomingStreamReader(SocketChannel socketChannel)
{
this.socketChannel = socketChannel;
InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- streamContext = StreamContextManager.getStreamContext(remoteAddress.getAddress());
- assert streamContext != null;
- streamStatus = StreamContextManager.getStreamStatus(remoteAddress.getAddress());
+ initiatedFile = StreamInManager.getStreamContext(remoteAddress.getAddress());
+ assert initiatedFile != null;
+ streamStatus = StreamInManager.getStreamStatus(remoteAddress.getAddress());
assert streamStatus != null;
}
@@ -51,32 +52,32 @@
{
InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
if (logger.isDebugEnabled())
- logger.debug("Creating file for " + streamContext.getTargetFile());
- FileOutputStream fos = new FileOutputStream(streamContext.getTargetFile(), true);
+ logger.debug("Creating file for " + initiatedFile.getTargetFile());
+ FileOutputStream fos = new FileOutputStream(initiatedFile.getTargetFile(), true);
FileChannel fc = fos.getChannel();
long bytesRead = 0;
try
{
- while (bytesRead < streamContext.getExpectedBytes())
+ while (bytesRead < initiatedFile.getExpectedBytes())
bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
}
catch (IOException ex)
{
/* Ask the source node to re-stream this file. */
- streamStatus.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+ streamStatus.setAction(StreamInManager.StreamCompletionAction.STREAM);
handleStreamCompletion(remoteAddress.getAddress());
/* Delete the orphaned file. */
- File file = new File(streamContext.getTargetFile());
+ File file = new File(initiatedFile.getTargetFile());
file.delete();
throw ex;
}
- if (bytesRead == streamContext.getExpectedBytes())
+ if (bytesRead == initiatedFile.getExpectedBytes())
{
if (logger.isDebugEnabled())
{
- logger.debug("Removing stream context " + streamContext);
+ logger.debug("Removing stream context " + initiatedFile);
}
fc.close();
handleStreamCompletion(remoteAddress.getAddress());
@@ -89,8 +90,8 @@
* Streaming is complete. If all the data that has to be received inform the sender via
* the stream completion callback so that the source may perform the requisite cleanup.
*/
- IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+ IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost);
if (streamComplete != null)
- streamComplete.onStreamCompletion(remoteHost, streamContext, streamStatus);
+ streamComplete.onStreamCompletion(remoteHost, initiatedFile, streamStatus);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:23:19 2010
@@ -22,7 +22,7 @@
import java.net.InetAddress;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
public interface IStreamComplete
{
@@ -30,5 +30,5 @@
* This callback if registered with the StreamContextManager is
* called when the stream from a host is completely handled.
*/
- public void onStreamCompletion(InetAddress from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+ public void onStreamCompletion(InetAddress from, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException;
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904927&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java Sun Jan 31 00:23:19 2010
@@ -0,0 +1,90 @@
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+public class InitiatedFile
+{
+ private static ICompactSerializer<InitiatedFile> serializer_;
+
+ static
+ {
+ serializer_ = new InitiatedFileSerializer();
+ }
+
+ public static ICompactSerializer<InitiatedFile> serializer()
+ {
+ return serializer_;
+ }
+
+ private String targetFile_;
+ private long expectedBytes_;
+ private String table_;
+
+ public InitiatedFile(String targetFile, long expectedBytes, String table)
+ {
+ targetFile_ = targetFile;
+ expectedBytes_ = expectedBytes;
+ table_ = table;
+ }
+
+ public String getTable()
+ {
+ return table_;
+ }
+
+ public String getTargetFile()
+ {
+ return targetFile_;
+ }
+
+ public void setTargetFile(String file)
+ {
+ targetFile_ = file;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof InitiatedFile) )
+ return false;
+
+ InitiatedFile rhs = (InitiatedFile)o;
+ return targetFile_.equals(rhs.targetFile_);
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return targetFile_ + ":" + expectedBytes_;
+ }
+
+ private static class InitiatedFileSerializer implements ICompactSerializer<InitiatedFile>
+ {
+ public void serialize(InitiatedFile sc, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(sc.targetFile_);
+ dos.writeLong(sc.expectedBytes_);
+ dos.writeUTF(sc.table_);
+ }
+
+ public InitiatedFile deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ String table = dis.readUTF();
+ return new InitiatedFile(targetFile, expectedBytes, table);
+ }
+ }
+}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamCompletionHandler.java Sun Jan 31 00:23:19 2010
@@ -12,7 +12,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.service.StorageService;
/**
@@ -25,41 +25,41 @@
{
private static Logger logger = Logger.getLogger(StreamCompletionHandler.class);
- public void onStreamCompletion(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ public void onStreamCompletion(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus) throws IOException
{
/* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
- if (streamContext.getTargetFile().contains("-Data.db"))
+ if (initiatedFile.getTargetFile().contains("-Data.db"))
{
- String tableName = streamContext.getTable();
- File file = new File( streamContext.getTargetFile() );
+ String tableName = initiatedFile.getTable();
+ File file = new File( initiatedFile.getTargetFile() );
String fileName = file.getName();
String [] temp = fileName.split("-");
//Open the file to see if all parts are now here
try
{
- SSTableReader sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+ SSTableReader sstable = SSTableWriter.renameAndOpen(initiatedFile.getTargetFile());
//TODO add a sanity check that this sstable has all its parts and is ok
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
logger.info("Streaming added " + sstable.getFilename());
}
catch (IOException e)
{
- throw new RuntimeException("Not able to add streamed file " + streamContext.getTargetFile(), e);
+ throw new RuntimeException("Not able to add streamed file " + initiatedFile.getTargetFile(), e);
}
}
if (logger.isDebugEnabled())
logger.debug("Sending a streaming finished message with " + streamStatus + " to " + host);
/* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
- StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
- Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ StreamInManager.StreamStatusMessage streamStatusMessage = new StreamInManager.StreamStatusMessage(streamStatus);
+ Message message = StreamInManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
MessagingService.instance.sendOneWay(message, host);
/* If we're done with everything for this host, remove from bootstrap sources */
- if (StreamContextManager.isDone(host) && StorageService.instance.isBootstrapMode())
+ if (StreamInManager.isDone(host) && StorageService.instance.isBootstrapMode())
{
- StorageService.instance.removeBootstrapSource(host, streamContext.getTable());
+ StorageService.instance.removeBootstrapSource(host, initiatedFile.getTable());
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -9,8 +9,8 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamContextManager;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamInManager;
+import org.apache.cassandra.streaming.StreamOutManager;
public class StreamFinishedVerbHandler implements IVerbHandler
{
@@ -23,19 +23,19 @@
try
{
- StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
- StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+ StreamInManager.StreamStatusMessage streamStatusMessage = StreamInManager.StreamStatusMessage.serializer().deserialize(new DataInputStream(bufIn));
+ StreamInManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
switch (streamStatus.getAction())
{
case DELETE:
- StreamManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+ StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
break;
case STREAM:
if (logger.isDebugEnabled())
logger.debug("Need to re-stream file " + streamStatus.getFile());
- StreamManager.get(message.getFrom()).startNext();
+ StreamOutManager.get(message.getFrom()).startNext();
break;
default:
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java&r1=904925&r2=904927&rev=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:23:19 2010
@@ -33,9 +33,9 @@
import org.apache.log4j.Logger;
-public class StreamContextManager
+public class StreamInManager
{
- private static Logger logger = Logger.getLogger(StreamContextManager.class);
+ private static final Logger logger = Logger.getLogger(StreamInManager.class);
public static enum StreamCompletionAction
{
@@ -43,89 +43,6 @@
STREAM
}
- public static class StreamContext
- {
- private static ICompactSerializer<StreamContext> serializer_;
-
- static
- {
- serializer_ = new StreamContextSerializer();
- }
-
- public static ICompactSerializer<StreamContext> serializer()
- {
- return serializer_;
- }
-
- private String targetFile_;
- private long expectedBytes_;
- private String table_;
-
- public StreamContext(String targetFile, long expectedBytes, String table)
- {
- targetFile_ = targetFile;
- expectedBytes_ = expectedBytes;
- table_ = table;
- }
-
- public String getTable()
- {
- return table_;
- }
-
- public String getTargetFile()
- {
- return targetFile_;
- }
-
- public void setTargetFile(String file)
- {
- targetFile_ = file;
- }
-
- public long getExpectedBytes()
- {
- return expectedBytes_;
- }
-
- public boolean equals(Object o)
- {
- if ( !(o instanceof StreamContext) )
- return false;
-
- StreamContext rhs = (StreamContext)o;
- return targetFile_.equals(rhs.targetFile_);
- }
-
- public int hashCode()
- {
- return toString().hashCode();
- }
-
- public String toString()
- {
- return targetFile_ + ":" + expectedBytes_;
- }
- }
-
- public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
- {
- public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(sc.targetFile_);
- dos.writeLong(sc.expectedBytes_);
- dos.writeUTF(sc.table_);
- }
-
- public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
- {
- String targetFile = dis.readUTF();
- long expectedBytes = dis.readLong();
- String table = dis.readUTF();
- return new StreamContext(targetFile, expectedBytes, table);
- }
- }
-
public static class StreamStatus
{
private static ICompactSerializer<StreamStatus> serializer_;
@@ -148,7 +65,7 @@
{
file_ = file;
expectedBytes_ = expectedBytes;
- action_ = StreamContextManager.StreamCompletionAction.DELETE;
+ action_ = StreamInManager.StreamCompletionAction.DELETE;
}
public String getFile()
@@ -161,12 +78,12 @@
return expectedBytes_;
}
- public void setAction(StreamContextManager.StreamCompletionAction action)
+ public void setAction(StreamInManager.StreamCompletionAction action)
{
action_ = action;
}
- public StreamContextManager.StreamCompletionAction getAction()
+ public StreamInManager.StreamCompletionAction getAction()
{
return action_;
}
@@ -223,14 +140,14 @@
return new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_FINISHED, bos.toByteArray());
}
- protected StreamContextManager.StreamStatus streamStatus_;
+ protected StreamInManager.StreamStatus streamStatus_;
- public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+ public StreamStatusMessage(StreamInManager.StreamStatus streamStatus)
{
streamStatus_ = streamStatus;
}
- public StreamContextManager.StreamStatus getStreamStatus()
+ public StreamInManager.StreamStatus getStreamStatus()
{
return streamStatus_;
}
@@ -245,27 +162,27 @@
public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
{
- StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
+ StreamInManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
return new StreamStatusMessage(streamStatus);
}
}
/* Maintain a stream context per host that is the source of the stream */
- public static final Map<InetAddress, List<StreamContext>> ctxBag_ = new Hashtable<InetAddress, List<StreamContext>>();
+ public static final Map<InetAddress, List<InitiatedFile>> ctxBag_ = new Hashtable<InetAddress, List<InitiatedFile>>();
/* Maintain in this map the status of the streams that need to be sent back to the source */
public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
/* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
- public synchronized static StreamContext getStreamContext(InetAddress key)
+ public synchronized static InitiatedFile getStreamContext(InetAddress key)
{
- List<StreamContext> context = ctxBag_.get(key);
+ List<InitiatedFile> context = ctxBag_.get(key);
if ( context == null )
throw new IllegalStateException("Streaming context has not been set for " + key);
- StreamContext streamContext = context.remove(0);
+ InitiatedFile initiatedFile = context.remove(0);
if ( context.isEmpty() )
ctxBag_.remove(key);
- return streamContext;
+ return initiatedFile;
}
public synchronized static StreamStatus getStreamStatus(InetAddress key)
@@ -303,16 +220,16 @@
streamNotificationHandlers_.put(key, streamComplete);
}
- public synchronized static void addStreamContext(InetAddress key, StreamContext streamContext, StreamStatus streamStatus)
+ public synchronized static void addStreamContext(InetAddress key, InitiatedFile initiatedFile, StreamStatus streamStatus)
{
/* Record the stream context */
- List<StreamContext> context = ctxBag_.get(key);
+ List<InitiatedFile> context = ctxBag_.get(key);
if ( context == null )
{
- context = new ArrayList<StreamContext>();
+ context = new ArrayList<InitiatedFile>();
ctxBag_.put(key, context);
}
- context.add(streamContext);
+ context.add(initiatedFile);
/* Record the stream status for this stream context */
List<StreamStatus> status = streamStatusBag_.get(key);
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateDoneVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -4,7 +4,7 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOutManager;
public class StreamInitiateDoneVerbHandler implements IVerbHandler
{
@@ -14,6 +14,6 @@
{
if (logger.isDebugEnabled())
logger.debug("Received a stream initiate done message ...");
- StreamManager.get(message.getFrom()).startNext();
+ StreamOutManager.get(message.getFrom()).startNext();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:23:19 2010
@@ -16,7 +16,7 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -41,9 +41,9 @@
try
{
StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
- StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+ InitiatedFile[] initiatedFiles = biMsg.getStreamContext();
- if (streamContexts.length == 0)
+ if (initiatedFiles.length == 0)
{
if (logger.isDebugEnabled())
logger.debug("no data needed from " + message.getFrom());
@@ -52,7 +52,7 @@
return;
}
- Map<String, String> fileNames = getNewNames(streamContexts);
+ Map<String, String> fileNames = getNewNames(initiatedFiles);
Map<String, String> pathNames = new HashMap<String, String>();
for (String ssName : fileNames.keySet())
pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -61,18 +61,18 @@
* generate the new file names and store the new file names
* in the StreamContextManager.
*/
- for (StreamContextManager.StreamContext streamContext : streamContexts )
+ for (InitiatedFile initiatedFile : initiatedFiles)
{
- StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
- String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, streamContext);
+ StreamInManager.StreamStatus streamStatus = new StreamInManager.StreamStatus(initiatedFile.getTargetFile(), initiatedFile.getExpectedBytes() );
+ String file = getNewFileNameFromOldContextAndNames(fileNames, pathNames, initiatedFile);
if (logger.isDebugEnabled())
- logger.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
- streamContext.setTargetFile(file);
- addStreamContext(message.getFrom(), streamContext, streamStatus);
+ logger.debug("Received Data from : " + message.getFrom() + " " + initiatedFile.getTargetFile() + " " + file);
+ initiatedFile.setTargetFile(file);
+ addStreamContext(message.getFrom(), initiatedFile, streamStatus);
}
- StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
+ StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate done message ...");
Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );
@@ -86,23 +86,23 @@
public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
Map<String, String> pathNames,
- StreamContextManager.StreamContext streamContext)
+ InitiatedFile initiatedFile)
{
- File sourceFile = new File( streamContext.getTargetFile() );
+ File sourceFile = new File( initiatedFile.getTargetFile() );
String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
String cfName = piece[0];
String ssTableNum = piece[1];
String typeOfFile = piece[2];
- String newFileNameExpanded = fileNames.get(streamContext.getTable() + "-" + cfName + "-" + ssTableNum);
- String path = pathNames.get(streamContext.getTable() + "-" + cfName + "-" + ssTableNum);
+ String newFileNameExpanded = fileNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
+ String path = pathNames.get(initiatedFile.getTable() + "-" + cfName + "-" + ssTableNum);
//Drop type (Data.db) from new FileName
String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
- return path + File.separator + streamContext.getTable() + File.separator + newFileName;
+ return path + File.separator + initiatedFile.getTable() + File.separator + newFileName;
}
// todo: this method needs to be private, or package at the very least for easy unit testing.
- public Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+ public Map<String, String> getNewNames(InitiatedFile[] initiatedFiles) throws IOException
{
/*
* Mapping for each file with unique CF-i ---> new file name. For eg.
@@ -113,10 +113,10 @@
Map<String, String> fileNames = new HashMap<String, String>();
/* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
Set<String> distinctEntries = new HashSet<String>();
- for ( StreamContextManager.StreamContext streamContext : streamContexts )
+ for ( InitiatedFile initiatedFile : initiatedFiles)
{
- String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
- distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+ String[] pieces = FBUtilities.strip(new File(initiatedFile.getTargetFile()).getName(), "-");
+ distinctEntries.add(initiatedFile.getTable() + "-" + pieces[0] + "-" + pieces[1] );
}
/* Generate unique file names per entry */
@@ -136,10 +136,10 @@
return fileNames;
}
- private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ private void addStreamContext(InetAddress host, InitiatedFile initiatedFile, StreamInManager.StreamStatus streamStatus)
{
if (logger.isDebugEnabled())
- logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
- StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+ logger.debug("Adding stream context " + initiatedFile + " for " + host + " ...");
+ StreamInManager.addStreamContext(host, initiatedFile, streamStatus);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:23:19 2010
@@ -38,10 +38,9 @@
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.streaming.StreamContextManager;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOutManager;
/**
* This class handles streaming data from one node to another.
@@ -121,31 +120,31 @@
*/
public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
{
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
+ InitiatedFile[] initiatedFiles = new InitiatedFile[SSTable.FILES_ON_DISK * sstables.size()];
int i = 0;
for (SSTableReader sstable : sstables)
{
for (String filename : sstable.getAllFilenames())
{
File file = new File(filename);
- streamContexts[i++] = new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), table);
+ initiatedFiles[i++] = new InitiatedFile(file.getAbsolutePath(), file.length(), table);
}
}
if (logger.isDebugEnabled())
- logger.debug("Stream context metadata " + StringUtils.join(streamContexts, ", "));
+ logger.debug("Stream context metadata " + StringUtils.join(initiatedFiles, ", "));
- StreamManager.get(target).addFilesToStream(streamContexts);
- StreamInitiateMessage biMessage = new StreamInitiateMessage(streamContexts);
+ StreamOutManager.get(target).addFilesToStream(initiatedFiles);
+ StreamInitiateMessage biMessage = new StreamInitiateMessage(initiatedFiles);
Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate message to " + target + " ...");
MessagingService.instance.sendOneWay(message, target);
- if (streamContexts.length > 0)
+ if (initiatedFiles.length > 0)
{
logger.info("Waiting for transfer to " + target + " to complete");
- StreamManager.get(target).waitForStreamCompletion();
+ StreamOutManager.get(target).waitForStreamCompletion();
// (StreamManager will delete the streamed file on completion.)
logger.info("Done with transfer to " + target);
}
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (from r904925, incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java&r1=904925&r2=904927&rev=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Sun Jan 31 00:23:19 2010
@@ -28,7 +28,6 @@
import java.net.InetAddress;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamContextManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.SimpleCondition;
@@ -38,22 +37,22 @@
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamManager
+public class StreamOutManager
{
- private static Logger logger = Logger.getLogger( StreamManager.class );
+ private static Logger logger = Logger.getLogger( StreamOutManager.class );
- private static ConcurrentMap<InetAddress, StreamManager> streamManagers = new ConcurrentHashMap<InetAddress, StreamManager>();
+ private static ConcurrentMap<InetAddress, StreamOutManager> streamManagers = new ConcurrentHashMap<InetAddress, StreamOutManager>();
- public static StreamManager get(InetAddress to)
+ public static StreamOutManager get(InetAddress to)
{
- StreamManager streamManager = streamManagers.get(to);
- if (streamManager == null)
+ StreamOutManager manager = streamManagers.get(to);
+ if (manager == null)
{
- StreamManager possibleNew = new StreamManager(to);
- if ((streamManager = streamManagers.putIfAbsent(to, possibleNew)) == null)
- streamManager = possibleNew;
+ StreamOutManager possibleNew = new StreamOutManager(to);
+ if ((manager = streamManagers.putIfAbsent(to, possibleNew)) == null)
+ manager = possibleNew;
}
- return streamManager;
+ return manager;
}
private final List<File> files = new ArrayList<File>();
@@ -61,19 +60,19 @@
private long totalBytes = 0L;
private final SimpleCondition condition = new SimpleCondition();
- private StreamManager(InetAddress to)
+ private StreamOutManager(InetAddress to)
{
this.to = to;
}
- public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts)
+ public void addFilesToStream(InitiatedFile[] initiatedFiles)
{
- for (StreamContextManager.StreamContext streamContext : streamContexts)
+ for (InitiatedFile initiatedFile : initiatedFiles)
{
if (logger.isDebugEnabled())
- logger.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
- files.add( new File( streamContext.getTargetFile() ) );
- totalBytes += streamContext.getExpectedBytes();
+ logger.debug("Adding file " + initiatedFile.getTargetFile() + " to be streamed.");
+ files.add( new File( initiatedFile.getTargetFile() ) );
+ totalBytes += initiatedFile.getExpectedBytes();
}
}
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=904927&r1=904926&r2=904927&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Sun Jan 31 00:23:19 2010
@@ -27,7 +27,7 @@
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.StreamContextManager;
+import org.apache.cassandra.streaming.InitiatedFile;
import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
import org.junit.Test;
@@ -37,12 +37,12 @@
@Test
public void testGetNewNames() throws IOException
{
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[3];
- streamContexts[0] = new StreamContextManager.StreamContext("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
- streamContexts[1] = new StreamContextManager.StreamContext("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
- streamContexts[2] = new StreamContextManager.StreamContext("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
+ InitiatedFile[] initiatedFiles = new InitiatedFile[3];
+ initiatedFiles[0] = new InitiatedFile("/baz/foo/Standard1-500-Data.db", 100, "Keyspace1");
+ initiatedFiles[1] = new InitiatedFile("/bar/foo/Standard1-500-Index.db", 100, "Keyspace1");
+ initiatedFiles[2] = new InitiatedFile("/bad/foo/Standard1-500-Filter.db", 100, "Keyspace1");
StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
- Map<String, String> fileNames = bivh.getNewNames(streamContexts);
+ Map<String, String> fileNames = bivh.getNewNames(initiatedFiles);
Map<String, String> paths = new HashMap<String, String>();
for (String ssName : fileNames.keySet())
paths.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
@@ -52,8 +52,8 @@
assertEquals(true, result.contains("Data.db"));
assertEquals(1, fileNames.entrySet().size());
- assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
- assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
- assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+ assertTrue(new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, paths, initiatedFiles[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
}
}