You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/07 23:51:06 UTC
svn commit: r888170 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/net/io/ test/unit/org/apache/cassandra/io/
Author: jbellis
Date: Mon Dec 7 22:51:05 2009
New Revision: 888170
URL: http://svn.apache.org/viewvc?rev=888170&view=rev
Log:
cleanup Streaming and rename transferOneTable -> transferSSTables. patch by Stu Hood; reviewed by jbellis for CASSANDRA-520
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (with props)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Mon Dec 7 22:51:05 2009
@@ -51,7 +51,7 @@
public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
/**
- * split out files on disk locally for each range and then stream them to the target endpoint
+ * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
*/
public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
{
@@ -77,7 +77,7 @@
if (logger.isDebugEnabled())
logger.debug("Performing anticompaction ...");
/* Get the list of files that need to be streamed */
- transferOneTable(target, table.forceAntiCompaction(ranges, target), tName); // SSTR GC deletes the file when done
+ transferSSTables(target, table.forceAntiCompaction(ranges, target), tName); // SSTR GC deletes the file when done
}
catch (IOException e)
{
@@ -88,7 +88,11 @@
callback.run();
}
- private static void transferOneTable(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
+ /**
+ * Transfers a group of sstables from a single table to the target endpoint
+ * and then marks them as ready for local deletion.
+ */
+ public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
{
StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
int i = 0;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon Dec 7 22:51:05 2009
@@ -53,7 +53,6 @@
connection = new TcpConnection(from_, to_);
File file = new File(file_);
connection.stream(file, startPosition_, total_);
- MessagingService.setStreamingMode(false);
if (logger_.isDebugEnabled())
logger_.debug("Done streaming " + file);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Dec 7 22:51:05 2009
@@ -57,8 +57,6 @@
};
private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
- /* Indicate if we are currently streaming data to another node or receiving streaming data */
- private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
/* This records all the results mapped by message Id */
private static ICachetable<String, IAsyncCallback> callbackMap_;
@@ -485,17 +483,11 @@
public void stream(String file, long startPosition, long total, InetAddress from, InetAddress to)
{
- isStreaming_.set(true);
/* Streaming asynchronously on streamExector_ threads. */
Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
streamExecutor_.execute(streamingTask);
}
- public static void setStreamingMode(boolean bVal)
- {
- isStreaming_.set(bVal);
- }
-
public static void shutdown()
{
logger_.info("Shutting down ...");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Mon Dec 7 22:51:05 2009
@@ -459,7 +459,6 @@
}
else
{
- MessagingService.setStreamingMode(false);
/* Close this socket connection used for streaming */
closeSocket();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=888170&r1=888169&r2=888170&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Dec 7 22:51:05 2009
@@ -50,9 +50,6 @@
int stream = MessagingService.getBits(pH, 3, 1);
stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
- if ( stream_.getProtocolHeader().isStreamingMode_ )
- MessagingService.setStreamingMode(true);
-
int version = MessagingService.getBits(pH, 15, 8);
stream_.getProtocolHeader().version_ = version;
Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=888170&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Mon Dec 7 22:51:05 2009
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RangeReply;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.SSTableUtils;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.Test;
+
+public class StreamingTest extends CleanupHelper
+{
+ public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+ @Test
+ public void testTransferTable() throws Exception
+ {
+ StorageService.instance().start();
+
+ // write a temporary SSTable, but don't register it
+ Set<String> content = new HashSet<String>();
+ content.add("key");
+ SSTableReader sstable = SSTableUtils.writeSSTable(content);
+ String tablename = sstable.getTableName();
+ String cfname = sstable.getColumnFamilyName();
+
+ // transfer
+ Streaming.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+
+ // confirm that the SSTable was transferred and registered
+ ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
+ RangeReply rr = cfstore.getKeyRange("", "", 2);
+ assert rr.keys.size() == 1;
+ assert rr.keys.contains("key");
+ }
+}
Propchange: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
------------------------------------------------------------------------------
svn:eol-style = native