You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/31 01:29:38 UTC
svn commit: r904932 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/net/io/
src/java/org/apache/cassandra/service/ src/java/org/apac...
Author: jbellis
Date: Sun Jan 31 00:29:37 2010
New Revision: 904932
URL: http://svn.apache.org/viewvc?rev=904932&view=rev
Log:
move IncomingStreamReader, StreamInitiateMessage, and BootstrapTest to streaming package. r/m 'public' modifier from streaming classes that don't need it
patch by jbellis; reviewed by stuhood for CASSANDRA-751
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (contents, props changed)
- copied, changed from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java (contents, props changed)
- copied, changed from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/
incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (contents, props changed)
- copied, changed from r904930, incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sun Jan 31 00:29:37 2010
@@ -28,7 +28,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamOut;
import org.apache.log4j.Logger;
@@ -415,10 +414,10 @@
Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, Long>(justRemovedEndPoints_);
for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
{
- if ((now - entry.getValue()) > StreamOut.RING_DELAY)
+ if ((now - entry.getValue()) > StorageService.RING_DELAY)
{
if (logger_.isDebugEnabled())
- logger_.debug(StreamOut.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
+ logger_.debug(StorageService.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
justRemovedEndPoints_.remove(entry.getKey());
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Sun Jan 31 00:29:37 2010
@@ -2,12 +2,10 @@
import java.io.*;
import java.net.Socket;
-import java.nio.ByteBuffer;
import org.apache.log4j.Logger;
-import org.apache.cassandra.net.io.IncomingStreamReader;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.IncomingStreamReader;
public class IncomingTcpConnection extends Thread
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Sun Jan 31 00:29:37 2010
@@ -35,7 +35,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.streaming.StreamOut;
/*
* The load balancing algorithm here is an implementation of
@@ -365,7 +364,7 @@
Thread.sleep(100);
}
// one more sleep in case there are some stragglers
- Thread.sleep(StreamOut.RING_DELAY);
+ Thread.sleep(StorageService.RING_DELAY);
}
catch (InterruptedException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sun Jan 31 00:29:37 2010
@@ -62,6 +62,8 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
+ public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
+
public final static String MOVE_STATE = "MOVE";
// this must be a char that cannot be present in any token
@@ -316,10 +318,10 @@
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
Gossiper.instance.addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token)));
- logger_.info("bootstrap sleeping " + StreamOut.RING_DELAY);
+ logger_.info("bootstrap sleeping " + RING_DELAY);
try
{
- Thread.sleep(StreamOut.RING_DELAY);
+ Thread.sleep(RING_DELAY);
}
catch (InterruptedException e)
{
@@ -1265,8 +1267,8 @@
logger_.info("DECOMMISSIONING");
startLeaving();
- logger_.info("decommission sleeping " + StreamOut.RING_DELAY);
- Thread.sleep(StreamOut.RING_DELAY);
+ logger_.info("decommission sleeping " + RING_DELAY);
+ Thread.sleep(RING_DELAY);
Runnable finishLeaving = new Runnable()
{
@@ -1364,8 +1366,8 @@
logger_.info("starting move. leaving token " + getLocalToken());
startLeaving();
- logger_.info("move sleeping " + StreamOut.RING_DELAY);
- Thread.sleep(StreamOut.RING_DELAY);
+ logger_.info("move sleeping " + RING_DELAY);
+ Thread.sleep(RING_DELAY);
Runnable finishMoving = new WrappedRunnable()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/CompletedFileStatus.java Sun Jan 31 00:29:37 2010
@@ -10,7 +10,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class CompletedFileStatus
+class CompletedFileStatus
{
private static ICompactSerializer<CompletedFileStatus> serializer_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IStreamComplete.java Sun Jan 31 00:29:37 2010
@@ -22,7 +22,7 @@
import java.net.InetAddress;
-public interface IStreamComplete
+interface IStreamComplete
{
/*
* This callback if registered with the StreamContextManager is
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IncomingStreamReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.net.io;
+package org.apache.cassandra.streaming;
import java.net.InetSocketAddress;
import java.net.InetAddress;
@@ -27,10 +27,6 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.net.FileStreamTask;
-import org.apache.cassandra.streaming.CompletedFileStatus;
-import org.apache.cassandra.streaming.IStreamComplete;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInManager;
public class IncomingStreamReader
{
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/InitiatedFile.java Sun Jan 31 00:29:37 2010
@@ -6,7 +6,7 @@
import org.apache.cassandra.io.ICompactSerializer;
-public class InitiatedFile
+class InitiatedFile
{
private static ICompactSerializer<InitiatedFile> serializer_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java Sun Jan 31 00:29:37 2010
@@ -29,7 +29,7 @@
import org.apache.log4j.Logger;
-public class StreamInManager
+class StreamInManager
{
private static final Logger logger = Logger.getLogger(StreamInManager.class);
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java (from r904930, incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/StreamInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -29,7 +29,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class StreamInitiateMessage
+class StreamInitiateMessage
{
private static ICompactSerializer<StreamInitiateMessage> serializer_;
@@ -62,32 +62,31 @@
{
return streamContexts_;
}
-}
-class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
-{
- public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
+ private static class StreamInitiateMessageSerializer implements ICompactSerializer<StreamInitiateMessage>
{
- dos.writeInt(bim.streamContexts_.length);
- for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+ public void serialize(StreamInitiateMessage bim, DataOutputStream dos) throws IOException
{
- InitiatedFile.serializer().serialize(initiatedFile, dos);
+ dos.writeInt(bim.streamContexts_.length);
+ for ( InitiatedFile initiatedFile : bim.streamContexts_ )
+ {
+ InitiatedFile.serializer().serialize(initiatedFile, dos);
+ }
}
- }
-
- public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- InitiatedFile[] initiatedFiles = new InitiatedFile[0];
- if ( size > 0 )
+
+ public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
{
- initiatedFiles = new InitiatedFile[size];
- for ( int i = 0; i < size; ++i )
+ int size = dis.readInt();
+ InitiatedFile[] initiatedFiles = new InitiatedFile[0];
+ if ( size > 0 )
{
- initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
+ initiatedFiles = new InitiatedFile[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ initiatedFiles[i] = InitiatedFile.serializer().deserialize(dis);
+ }
}
+ return new StreamInitiateMessage(initiatedFiles);
}
- return new StreamInitiateMessage(initiatedFiles);
}
}
-
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Sun Jan 31 00:29:37 2010
@@ -12,7 +12,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jan 31 00:29:37 2010
@@ -33,7 +33,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.StreamInitiateMessage;
+import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.SSTable;
@@ -60,7 +60,6 @@
private static Logger logger = Logger.getLogger(StreamOut.class);
static String TABLE_NAME = "STREAMING-TABLE-NAME";
- public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
/**
* Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=904932&r1=904931&r2=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Sun Jan 31 00:29:37 2010
@@ -37,7 +37,7 @@
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamOutManager
+class StreamOutManager
{
private static Logger logger = Logger.getLogger( StreamOutManager.class );
Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (from r904930, incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&r1=904930&r2=904932&rev=904932&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Sun Jan 31 00:29:37 2010
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.dht;
+package org.apache.cassandra.streaming;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.*;
@@ -27,8 +27,6 @@
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.InitiatedFile;
-import org.apache.cassandra.streaming.StreamInitiateVerbHandler;
import org.junit.Test;
Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
------------------------------------------------------------------------------
svn:eol-style = native