You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/05 21:19:32 UTC
svn commit: r1067508 - in /cassandra/trunk: ./ conf/ debian/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassa...
Author: brandonwilliams
Date: Sat Feb 5 20:19:31 2011
New Revision: 1067508
URL: http://svn.apache.org/viewvc?rev=1067508&view=rev
Log:
Merge from 0.7. I hope.
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra-env.sh
cassandra/trunk/debian/changelog
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java
cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7:1026516-1066873
+/cassandra/branches/cassandra-0.7:1026516-1067497
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sat Feb 5 20:19:31 2011
@@ -13,7 +13,6 @@
the nagle/delayed ack problem (CASSANDRA-1896)
* check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
* more-efficient cross-DC replication (CASSANDRA-1530, -2051)
- * upgrade to TFastFramedTransport (CASSANDRA-1743)
* avoid polluting page cache with commitlog or sstable writes
and seq scan operations (CASSANDRA-1470)
* add RMI authentication options to nodetool (CASSANDRA-1921)
@@ -62,6 +61,7 @@
* ignore messages from newer versions, keep track of nodes in gossip
regardless of version (CASSANDRA-1970)
+
0.7.0-final
* fix offsets to ByteBuffer.get (CASSANDRA-1939)
Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Sat Feb 5 20:19:31 2011
@@ -132,6 +132,9 @@ JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatin
# JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime"
# JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log"
+# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414
+# JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"
+
# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See
# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version:
# comment out this entry to enable IPv6 support).
Modified: cassandra/trunk/debian/changelog
URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/changelog?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/debian/changelog (original)
+++ cassandra/trunk/debian/changelog Sat Feb 5 20:19:31 2011
@@ -2,7 +2,7 @@ cassandra (0.7.1) unstable; urgency=low
* New stable point release.
- -- Eric Evans <ee...@apache.org> Fri, 28 Jan 2011 13:56:19 -0600
+ -- Eric Evans <ee...@apache.org> Fri, 04 Feb 2011 12:57:52 -0600
cassandra (0.7.0~rc4) unstable; urgency=low
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1066873
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1067497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1066873
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1067497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1066873
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1067497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1066873
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1067497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 5 20:19:31 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1066873
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1067497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Sat Feb 5 20:19:31 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.util.*;
import com.google.common.base.Charsets;
@@ -963,7 +964,7 @@ public class CliClient extends CliUserHe
}
private void executeList(Tree statement)
- throws TException, InvalidRequestException, NotFoundException, IllegalAccessException, InstantiationException, NoSuchFieldException, UnavailableException, TimedOutException
+ throws TException, InvalidRequestException, NotFoundException, IllegalAccessException, InstantiationException, NoSuchFieldException, UnavailableException, TimedOutException, CharacterCodingException
{
if (!CliMain.isConnected() || !hasKeySpace())
return;
@@ -1923,7 +1924,7 @@ public class CliClient extends CliUserHe
* @throws NoSuchFieldException - column not found
*/
private void printSliceList(CfDef columnFamilyDef, List<KeySlice> slices)
- throws NotFoundException, TException, IllegalAccessException, InstantiationException, NoSuchFieldException
+ throws NotFoundException, TException, IllegalAccessException, InstantiationException, NoSuchFieldException, CharacterCodingException
{
AbstractType validator;
String columnFamilyName = columnFamilyDef.getName();
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java Sat Feb 5 20:19:31 2011
@@ -57,9 +57,9 @@ public class CliUserHelp {
put(ColumnFamilyArgument.COMMENT, "Human-readable column family description. Any string is acceptable");
put(ColumnFamilyArgument.COMPARATOR, "The class used as a comparator when sorting column names.\n Valid options include: AsciiType, BytesType, LexicalUUIDType,\n LongType, TimeUUIDType, and UTF8Type");
put(ColumnFamilyArgument.SUBCOMPARATOR, "Comparator for sorting subcolumn names, for Super columns only");
- put(ColumnFamilyArgument.MEMTABLE_OPERATIONS, "Flush memtables after this many operations");
- put(ColumnFamilyArgument.MEMTABLE_THROUGHPUT, "... or after this many bytes have been written");
- put(ColumnFamilyArgument.MEMTABLE_FLUSH_AFTER, "... or after this many seconds");
+ put(ColumnFamilyArgument.MEMTABLE_OPERATIONS, "Flush memtables after this many operations (in millions)");
+ put(ColumnFamilyArgument.MEMTABLE_THROUGHPUT, "... or after this many MB have been written");
+ put(ColumnFamilyArgument.MEMTABLE_FLUSH_AFTER, "... or after this many minutes");
put(ColumnFamilyArgument.ROWS_CACHED, "Number or percentage of rows to cache");
put(ColumnFamilyArgument.ROW_CACHE_SAVE_PERIOD, "Period with which to persist the row cache, in seconds");
put(ColumnFamilyArgument.KEYS_CACHED, "Number or percentage of keys to cache");
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Sat Feb 5 20:19:31 2011
@@ -72,6 +72,8 @@ public class DefinitionsUpdateResponseVe
try
{
m.apply();
+ // update gossip, but don't contact nodes directly
+ m.passiveAnnounce();
}
catch (ConfigurationException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Sat Feb 5 20:19:31 2011
@@ -23,11 +23,14 @@ import java.lang.management.ManagementFa
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +45,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
@@ -226,17 +230,53 @@ public class HintedHandOffManager implem
int index = ByteBufferUtil.lastIndexOf(joined, SEPARATOR.getBytes()[0], joined.limit());
if (index == -1 || index < (joined.position() + 1))
- throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.string(joined));
+ throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.bytesToHex(joined));
+
+ try
+ {
+ return new String[] { ByteBufferUtil.string(joined, joined.position(), index - joined.position()),
+ ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) };
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
- return new String[] {
- ByteBufferUtil.string(joined, joined.position(), index - joined.position()),
- ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1))
- };
+ private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
+ {
+ Gossiper gossiper = Gossiper.instance;
+ int waited = 0;
+ while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
+ gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
+ {
+ Thread.sleep(1000);
+ waited += 1000;
+ if (waited > 2 * StorageService.RING_DELAY)
+ throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+ }
+ logger_.debug("schema for {} matches local schema", endpoint);
+ return waited;
}
- private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+ private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
{
+ logger_.info("Checking remote schema before delivering hints");
+ int waited = waitForSchemaAgreement(endpoint);
+ // sleep a random amount to stagger handoff delivery from different replicas.
+ // (if we had to wait, then gossiper randomness took care of that for us already.)
+ if (waited == 0) {
+ int sleep = new Random().nextInt(60000);
+ logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
+ Thread.sleep(sleep);
+ }
+ if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+ {
+ logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
+ return;
+ }
logger_.info("Started hinted handoff for endpoint " + endpoint);
+
queuedDeliveries.remove(endpoint);
// 1. Get the key of the endpoint we need to handoff
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Sat Feb 5 20:19:31 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import com.google.common.base.Charsets;
@@ -36,7 +37,14 @@ public class AsciiType extends BytesType
@Override
public String getString(ByteBuffer bytes)
{
- return ByteBufferUtil.string(bytes, Charsets.US_ASCII);
+ try
+ {
+ return ByteBufferUtil.string(bytes, Charsets.US_ASCII);
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new MarshalException("Invalid ascii bytes " + ByteBufferUtil.bytesToHex(bytes));
+ }
}
public ByteBuffer fromString(String source)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java Sat Feb 5 20:19:31 2011
@@ -22,12 +22,10 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import java.util.Arrays;
import com.google.common.base.Charsets;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
public class UTF8Type extends BytesType
{
@@ -39,11 +37,11 @@ public class UTF8Type extends BytesType
{
try
{
- return FBUtilities.decodeToUTF8(bytes);
+ return ByteBufferUtil.string(bytes, Charsets.UTF_8);
}
catch (CharacterCodingException e)
{
- throw new MarshalException("invalid UTF8 bytes " + ByteBufferUtil.string(bytes));
+ throw new MarshalException("invalid UTF8 bytes " + ByteBufferUtil.bytesToHex(bytes));
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Sat Feb 5 20:19:31 2011
@@ -175,10 +175,15 @@ public abstract class Migration
if (StorageService.instance.isClientMode())
return;
- // immediate notification for esiting nodes.
+ // immediate notification for existing nodes.
MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers());
}
-
+
+ public final void passiveAnnounce()
+ {
+ MigrationManager.passiveAnnounce(newVersion);
+ }
+
public static UUID getLastMigrationId()
{
DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java Sat Feb 5 20:19:31 2011
@@ -25,7 +25,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
-import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.base.Charsets;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
public class CollatingOrderPreservingPartitioner extends AbstractByteOrderedPartitioner
{
@@ -39,7 +41,7 @@ public class CollatingOrderPreservingPar
String skey;
try
{
- skey = FBUtilities.decodeToUTF8(key);
+ skey = ByteBufferUtil.string(key, Charsets.UTF_8);
}
catch (CharacterCodingException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java Sat Feb 5 20:19:31 2011
@@ -123,7 +123,14 @@ public class OrderPreservingPartitioner
public Token<String> fromByteArray(ByteBuffer bytes)
{
- return new StringToken(ByteBufferUtil.string(bytes, Charsets.UTF_8));
+ try
+ {
+ return new StringToken(ByteBufferUtil.string(bytes, Charsets.UTF_8));
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public String toString(Token<String> stringToken)
@@ -152,7 +159,7 @@ public class OrderPreservingPartitioner
String skey;
try
{
- skey = FBUtilities.decodeToUTF8(key);
+ skey = ByteBufferUtil.string(key, Charsets.UTF_8);
}
catch (CharacterCodingException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java Sat Feb 5 20:19:31 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.dht;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.util.*;
import org.apache.cassandra.db.DecoratedKey;
@@ -61,7 +62,15 @@ public class RandomPartitioner implement
assert splitPoint != -1;
// and decode the token and key
- String token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position(), UTF_8);
+ String token = null;
+ try
+ {
+ token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position(), UTF_8);
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
ByteBuffer key = fromdisk.duplicate();
key.position(splitPoint + 1);
return new DecoratedKey<BigIntegerToken>(new BigIntegerToken(token), key);
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sat Feb 5 20:19:31 2011
@@ -257,7 +257,7 @@ public class Gossiper implements IFailur
liveEndpoints.remove(endpoint);
unreachableEndpoints.remove(endpoint);
- endpointStateMap.remove(endpoint);
+ // do not remove endpointState until the quarantine expires
FailureDetector.instance.remove(endpoint);
versions.remove(endpoint);
justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
@@ -325,9 +325,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
- if (logger.isTraceEnabled())
- logger.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+ return new Message(localEndpoint_, StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
}
Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
@@ -433,7 +431,8 @@ public class Gossiper implements IFailur
else
{
logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
- removeEndpoint(endpoint);
+ if (!justRemovedEndpoints_.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client
+ removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state
}
}
@@ -453,6 +452,7 @@ public class Gossiper implements IFailur
if (logger.isDebugEnabled())
logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
justRemovedEndpoints.remove(entry.getKey());
+ endpointStateMap_.remove(entry.getKey());
}
}
}
@@ -465,8 +465,6 @@ public class Gossiper implements IFailur
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
{
- if (logger.isTraceEnabled())
- logger.trace("Scanning for state greater than " + version + " for " + forEndpoint);
EndpointState epState = endpointStateMap.get(forEndpoint);
EndpointState reqdEndpointState = null;
@@ -484,6 +482,8 @@ public class Gossiper implements IFailur
if ( localHbVersion > version )
{
reqdEndpointState = new EndpointState(epState.getHeartBeatState());
+ if (logger_.isTraceEnabled())
+ logger_.trace("local heartbeat version " + localHbVersion + " greater than " + version + " for " + forEndpoint);
}
/* Accumulate all application states whose versions are greater than "version" variable */
for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
@@ -656,6 +656,11 @@ public class Gossiper implements IFailur
else if (logger.isTraceEnabled())
logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
}
+ else
+ {
+ if (logger_.isTraceEnabled())
+ logger_.trace("Ignoring remote generation " + remoteGeneration + " < " + localGeneration);
+ }
}
else
{
@@ -671,9 +676,9 @@ public class Gossiper implements IFailur
int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
Map<ApplicationState, VersionedValue> localAppStateMap = localState.getApplicationStateMap();
- localState.setHeartBeatState(remoteState.getHeartBeatState());
- if (logger.isTraceEnabled())
- logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
+ localState.setHeartBeatState(remoteHbState);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Updating heartbeat state generation to " + remoteHbState.getGeneration() + " from " + localHbState.getGeneration() + " for " + addr);
for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
{
@@ -700,6 +705,8 @@ public class Gossiper implements IFailur
{
/* We are here since we have no data for this endpoint locally so request everthing. */
deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0) );
+ if (logger_.isTraceEnabled())
+ logger_.trace("requestAll for " + gDigest.getEndpoint());
}
/* Send all the data with version greater than maxRemoteVersion */
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Sat Feb 5 20:19:31 2011
@@ -89,15 +89,21 @@ public class MigrationManager implements
}
}
- /** announce my version to a set of hosts. They may culminate with them sending me migrations. */
+ /** actively announce my version to a set of hosts via rpc. They may culminate with them sending me migrations. */
public static void announce(UUID version, Set<InetAddress> hosts)
{
Message msg = makeVersionMessage(version);
for (InetAddress host : hosts)
MessagingService.instance().sendOneWay(msg, host);
- // this is for notifying nodes as they arrive in the cluster.
+ passiveAnnounce(version);
+ }
+
+ /** announce my version passively over gossip **/
+ public static void passiveAnnounce(UUID version)
+ {
if (!StorageService.instance.isClientMode())
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
+ logger.debug("Announcing my schema is " + version);
}
/**
@@ -152,6 +158,7 @@ public class MigrationManager implements
throw new IOException(e);
}
}
+ passiveAnnounce(to); // we don't need to send rpcs, but we need to update gossip
}
/** pushes migrations from this host to another host */
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sat Feb 5 20:19:31 2011
@@ -1602,9 +1602,10 @@ public class StorageService implements I
calculatePendingRanges();
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken()));
+ logger_.info("Announcing that I have left the ring for " + RING_DELAY + "ms");
try
{
- Thread.sleep(2 * Gossiper.intervalInMillis);
+ Thread.sleep(RING_DELAY);
}
catch (InterruptedException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Sat Feb 5 20:19:31 2011
@@ -18,7 +18,6 @@
package org.apache.cassandra.thrift;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
@@ -27,11 +26,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@@ -123,8 +121,8 @@ public class CassandraDaemon extends org
if (DatabaseDescriptor.isThriftFramed())
{
int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
- inTransportFactory = new TFastFramedTransport.Factory(64 * 1024, tFramedTransportSize);
- outTransportFactory = new TFastFramedTransport.Factory(64 * 1024, tFramedTransportSize);
+ inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+ outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
}
else
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Sat Feb 5 20:19:31 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.Arrays;
@@ -100,28 +101,27 @@ public class ByteBufferUtil
return compareUnsigned(o1, ByteBuffer.wrap(o2));
}
- public static String string(ByteBuffer buffer)
+ public static String string(ByteBuffer buffer) throws CharacterCodingException
{
return string(buffer, Charset.defaultCharset());
}
- public static String string(ByteBuffer buffer, Charset charset)
+ public static String string(ByteBuffer buffer, int offset, int length) throws CharacterCodingException
{
- return string(buffer, buffer.position(), buffer.remaining(), charset);
+ return string(buffer, offset, length, Charset.defaultCharset());
}
- public static String string(ByteBuffer buffer, int offset, int length)
+ public static String string(ByteBuffer buffer, int offset, int length, Charset charset) throws CharacterCodingException
{
- return string(buffer, offset, length, Charset.defaultCharset());
+ ByteBuffer copy = buffer.duplicate();
+ copy.position(buffer.position() + offset);
+ copy.limit(copy.position() + length);
+ return string(buffer, charset);
}
- public static String string(ByteBuffer buffer, int offset, int length, Charset charset)
+ public static String string(ByteBuffer buffer, Charset charset) throws CharacterCodingException
{
- if (buffer.hasArray())
- return new String(buffer.array(), buffer.arrayOffset() + offset, length, charset);
-
- byte[] buff = getArray(buffer, offset, length);
- return new String(buff, charset);
+ return charset.newDecoder().decode(buffer.duplicate()).toString();
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Feb 5 20:19:31 2011
@@ -452,11 +452,6 @@ public class FBUtilities
return utflen;
}
- public static String decodeToUTF8(ByteBuffer bytes) throws CharacterCodingException
- {
- return Charsets.UTF_8.newDecoder().decode(bytes.duplicate()).toString();
- }
-
public static String resourceToFile(String filename) throws ConfigurationException
{
ClassLoader loader = PropertyFileSnitch.class.getClassLoader();
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Sat Feb 5 20:19:31 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
@@ -511,7 +512,14 @@ public class TableTest extends CleanupHe
List<String> L = new ArrayList<String>();
for (IColumn column : columns)
{
- L.add(ByteBufferUtil.string(column.name()));
+ try
+ {
+ L.add(ByteBufferUtil.string(column.name()));
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new AssertionError(e);
+ }
}
List<String> names = new ArrayList<String>(columnNames.length);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java?rev=1067508&r1=1067507&r2=1067508&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java Sat Feb 5 20:19:31 2011
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Arrays;
+import com.google.common.base.Charsets;
import org.junit.Test;
public class FBUtilitiesTest
@@ -126,6 +127,6 @@ public class FBUtilitiesTest
public void testDecode() throws IOException
{
ByteBuffer bytes = ByteBuffer.wrap(new byte[]{(byte)0xff, (byte)0xfe});
- FBUtilities.decodeToUTF8(bytes);
+ ByteBufferUtil.string(bytes, Charsets.UTF_8);
}
}