You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/24 19:13:57 UTC
svn commit: r1062906 - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/migration/
src/java/org/apache/cassandra/loca...
Author: jbellis
Date: Mon Jan 24 18:13:56 2011
New Revision: 1062906
URL: http://svn.apache.org/viewvc?rev=1062906&view=rev
Log:
merge from 0.7
Removed:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1062800
+/cassandra/branches/cassandra-0.7:1026516-1062901
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jan 24 18:13:56 2011
@@ -20,7 +20,7 @@
* retry hadoop split requests on connection failure (CASSANDRA-1927)
* implement describeOwnership for BOP, COPP (CASSANDRA-1928)
* make read repair behave as expected for ConsistencyLevel > ONE
- (CASSANDRA-982)
+ (CASSANDRA-982, 2038)
* distributed test harness (CASSANDRA-1859, 1964)
* reduce flush lock contention (CASSANDRA-1930)
* optimize supercolumn deserialization (CASSANDRA-1891)
@@ -39,6 +39,10 @@
* add short options for CLI flags (CASSANDRA-1565)
* make keyspace argument to "describe keyspace" in CLI optional
when authenticated to keyspace already (CASSANDRA-2029)
+ * added option to specify -Dcassandra.join_ring=false on startup
+ to allow "warm spare" nodes or performing JMX maintenance before
+ joining the ring (CASSANDRA-526)
+ * log migrations at INFO (CASSANDRA-2028)
0.7.0-final
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1062800
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1062901
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1062800
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1062901
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1062800
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1062901
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1062800
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1062901
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 24 18:13:56 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1062800
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1062901
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/KSMetaData.java Mon Jan 24 18:13:56 2011
@@ -29,6 +29,7 @@ import org.apache.avro.util.Utf8;
import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.commons.lang.StringUtils;
public final class KSMetaData
{
@@ -93,6 +94,21 @@ public final class KSMetaData
return ks;
}
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name)
+ .append("rep factor:")
+ .append(replicationFactor)
+ .append("rep strategy:")
+ .append(strategyClass.getSimpleName())
+ .append("{")
+ .append(StringUtils.join(cfMetaData.values(), ", "))
+ .append("}");
+ return sb.toString();
+ }
+
public static KSMetaData inflate(org.apache.cassandra.db.migration.avro.KsDef ks)
{
Class<AbstractReplicationStrategy> repStratClass;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Jan 24 18:13:56 2011
@@ -67,17 +67,7 @@ public class ReadVerbHandler implements
ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_));
Table table = Table.open(command.table);
Row row = command.getRow(table);
- ReadResponse readResponse;
- if (command.isDigestQuery())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
- readResponse = new ReadResponse(ColumnFamily.digest(row.cf));
- }
- else
- {
- readResponse = new ReadResponse(row);
- }
+ ReadResponse readResponse = getResponse(command, row);
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
@@ -97,4 +87,18 @@ public class ReadVerbHandler implements
throw new RuntimeException(ex);
}
}
+
+ public static ReadResponse getResponse(ReadCommand command, Row row)
+ {
+ if (command.isDigestQuery())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
+ return new ReadResponse(ColumnFamily.digest(row.cf));
+ }
+ else
+ {
+ return new ReadResponse(row);
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Mon Jan 24 18:13:56 2011
@@ -97,4 +97,10 @@ public class AddColumnFamily extends Mig
org.apache.cassandra.db.migration.avro.AddColumnFamily acf = (org.apache.cassandra.db.migration.avro.AddColumnFamily)mi.migration;
cfm = CFMetaData.inflate(acf.cf);
}
+
+ @Override
+ public String toString()
+ {
+ return "Add column family: " + cfm.toString();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddKeyspace.java Mon Jan 24 18:13:56 2011
@@ -88,4 +88,10 @@ public class AddKeyspace extends Migrati
org.apache.cassandra.db.migration.avro.AddKeyspace aks = (org.apache.cassandra.db.migration.avro.AddKeyspace)mi.migration;
ksm = KSMetaData.inflate(aks.ks);
}
+
+ @Override
+ public String toString()
+ {
+ return "Add keyspace: " + ksm.toString();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Jan 24 18:13:56 2011
@@ -113,4 +113,10 @@ public class DropColumnFamily extends Mi
tableName = dcf.ksname.toString();
cfName = dcf.cfname.toString();
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Drop column family: %s.%s", tableName, cfName);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Jan 24 18:13:56 2011
@@ -102,4 +102,10 @@ public class DropKeyspace extends Migrat
org.apache.cassandra.db.migration.avro.DropKeyspace dks = (org.apache.cassandra.db.migration.avro.DropKeyspace)mi.migration;
name = dks.ksname.toString();
}
+
+ @Override
+ public String toString()
+ {
+ return "Drop keyspace: " + name;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Mon Jan 24 18:13:56 2011
@@ -130,7 +130,7 @@ public abstract class Migration
migration.apply();
// note that we're storing this in the system table, which is not replicated
- logger.debug("Applying migration " + newVersion.toString());
+ logger.info("Applying migration {} {}", newVersion.toString(), toString());
migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY);
migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), ByteBuffer.wrap(UUIDGen.decompose(newVersion)), now);
migration.apply();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameColumnFamily.java Mon Jan 24 18:13:56 2011
@@ -125,4 +125,10 @@ public class RenameColumnFamily extends
oldName = rcf.old_cfname.toString();
newName = rcf.new_cfname.toString();
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Rename column family (%d) %s.%s to %s.%s", cfId, tableName, oldName, tableName, newName);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/RenameKeyspace.java Mon Jan 24 18:13:56 2011
@@ -143,4 +143,10 @@ public class RenameKeyspace extends Migr
oldName = rks.old_ksname.toString();
newName = rks.new_ksname.toString();
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Rename keyspace %s to %s", oldName, newName);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Mon Jan 24 18:13:56 2011
@@ -100,4 +100,10 @@ public class UpdateColumnFamily extends
org.apache.cassandra.db.migration.avro.UpdateColumnFamily update = (org.apache.cassandra.db.migration.avro.UpdateColumnFamily)mi.migration;
metadata = CFMetaData.inflate(update.metadata);
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Update column family to %s", metadata.toString());
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateKeyspace.java Mon Jan 24 18:13:56 2011
@@ -88,4 +88,10 @@ public class UpdateKeyspace extends Migr
newKsm = KSMetaData.inflate(uks.newKs);
oldKsm = KSMetaData.inflate(uks.oldKs);
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Update keyspace %s to %s", oldKsm.toString(), newKsm.toString());
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Mon Jan 24 18:13:56 2011
@@ -18,10 +18,8 @@
package org.apache.cassandra.locator;
-import java.io.BufferedReader;
-import java.io.FileReader;
import java.io.IOException;
-import java.io.Reader;
+import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
@@ -113,13 +111,12 @@ public class PropertyFileSnitch extends
{
HashMap<InetAddress, String[]> reloadedMap = new HashMap<InetAddress, String[]>();
- String rackPropertyFilename = FBUtilities.resourceToFile(RACK_PROPERTY_FILENAME);
Properties properties = new Properties();
- Reader reader = null;
+ InputStream stream = null;
try
{
- reader = new BufferedReader(new FileReader(rackPropertyFilename));
- properties.load(reader);
+ stream = getClass().getClassLoader().getResourceAsStream(RACK_PROPERTY_FILENAME);
+ properties.load(stream);
}
catch (IOException e)
{
@@ -127,7 +124,7 @@ public class PropertyFileSnitch extends
}
finally
{
- FileUtils.closeQuietly(reader);
+ FileUtils.closeQuietly(stream);
}
for (Map.Entry<Object, Object> entry : properties.entrySet())
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Jan 24 18:13:56 2011
@@ -45,12 +45,12 @@ public class Header
return serializer_;
}
- private InetAddress from_;
+ private final InetAddress from_;
// TODO STAGE can be determined from verb
- private StorageService.Verb verb_;
- private String messageId_;
+ private final StorageService.Verb verb_;
+ private final String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
-
+
Header(String id, InetAddress from, StorageService.Verb verb)
{
assert id != null;
@@ -88,12 +88,7 @@ public class Header
return messageId_;
}
- void setMessageId(String id)
- {
- messageId_ = id;
- }
-
- byte[] getDetail(Object key)
+ byte[] getDetail(String key)
{
return details_.get(key);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Jan 24 18:13:56 2011
@@ -59,7 +59,7 @@ public class Message
this(new Header(from, verb), body);
}
- public byte[] getHeader(Object key)
+ public byte[] getHeader(String key)
{
return header_.getDetail(key);
}
@@ -94,11 +94,6 @@ public class Message
return header_.getMessageId();
}
- void setMessageId(String id)
- {
- header_.setMessageId(id);
- }
-
// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Jan 24 18:13:56 2011
@@ -331,34 +331,6 @@ public final class MessagingService impl
}
/**
- * Send a message to a given endpoint. The ith element in the <code>messages</code>
- * array is sent to the ith element in the <code>to</code> array.This method assumes
- * there is a one-one mapping between the <code>messages</code> array and
- * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
- * This method also informs the MessagingService to wait for at least
- * <code>howManyResults</code> responses to determine success of failure.
- * @param messages messages to be sent.
- * @param to endpoints to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses or
- * suggest that a timeout occured to the invoker of the send().
- * @return an reference to message id used to match with the result
- */
- public String sendRR(Message[] messages, List<InetAddress> to, IAsyncCallback cb)
- {
- if (messages.length != to.size())
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- String groupId = GuidGenerator.guid();
- addCallback(cb, groupId);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- putTarget(groupId, to.get(i));
- sendOneWay(messages[i], to.get(i));
- }
- return groupId;
- }
-
- /**
* Send a message to a given endpoint. This method adheres to the fire and forget
* style messaging.
* @param message messages to be sent.
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Mon Jan 24 18:13:56 2011
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
@@ -89,6 +90,15 @@ public class ReadCallback<T> implements
if (resolver.isDataPresent())
condition.signal();
}
+
+ public void response(ReadResponse result)
+ {
+ ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ if (resolver.getMessageCount() < blockfor)
+ return;
+ if (resolver.isDataPresent())
+ condition.signal();
+ }
public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Jan 24 18:13:56 2011
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class ReadResponseResolver implem
private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
private DecoratedKey key;
private ByteBuffer digest;
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
public ReadResponseResolver(String table, ByteBuffer key)
{
@@ -248,10 +250,11 @@ public class ReadResponseResolver implem
}
}
- /** hack so ConsistencyChecker doesn't have to serialize/deserialize an extra real Message */
- public void injectPreProcessed(Message message, ReadResponse result)
+ /** hack so local reads don't force de/serialization of an extra real Message */
+ public void injectPreProcessed(ReadResponse result)
{
- results.put(message, result);
+ assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
+ results.put(FAKE_MESSAGE, result);
}
public boolean isDataPresent()
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jan 24 18:13:56 2011
@@ -526,14 +526,9 @@ public class StorageProxy implements Sto
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
- ReadCommand readMessageDigestOnly = command.copy();
- readMessageDigestOnly.setDigestQuery(true);
- Message message = command.makeReadMessage();
- Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
- InetAddress dataPoint = endpoints.get(0);
ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
@@ -549,19 +544,52 @@ public class StorageProxy implements Sto
{
endpoints = endpoints.subList(0, handler.blockfor);
}
- Message[] messages = new Message[endpoints.size()];
-
- // data-request message is sent to dataPoint, the node that will actually get
+
+ // The data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
- for (int i = 0; i < messages.length; i++)
+ ReadCommand digestCommand = null;
+ if (endpoints.size() > 1)
+ {
+ digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ }
+
+ InetAddress dataPoint = endpoints.get(0);
+ if (dataPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("reading data for " + command + " locally");
+ StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(command, handler));
+ }
+ else
{
- InetAddress endpoint = endpoints.get(i);
- Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
- messages[i] = m;
+ Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+ logger.debug("reading digest for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+ MessagingService.instance().sendRR(message, dataPoint, handler);
}
- MessagingService.instance().sendRR(messages, endpoints, handler);
+
+ // We lazy-construct the digest Message object since it may not be necessary if we
+ // are doing a local digest read, or no digest reads at all.
+ Message digestMessage = null;
+ for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+ {
+ if (digestPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("reading digest for " + command + " locally");
+ StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(digestCommand, handler));
+ }
+ else
+ {
+ if (digestMessage == null)
+ digestMessage = digestCommand.makeReadMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
+ MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
+ }
+ }
+
readCallbacks.add(handler);
commandEndpoints.add(endpoints);
}
@@ -619,6 +647,28 @@ public class StorageProxy implements Sto
return rows;
}
+ static class WeakReadLocalRunnable extends WrappedRunnable
+ {
+ private final ReadCommand command;
+ private final ReadCallback<Row> handler;
+
+ WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+ {
+ this.command = command;
+ this.handler = handler;
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("weakreadlocal reading " + command);
+
+ Table table = Table.open(command.table);
+ ReadResponse result = ReadVerbHandler.getResponse(command, command.getRow(table));
+ handler.response(result);
+ }
+ }
+
static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
{
if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 24 18:13:56 2011
@@ -171,15 +171,6 @@ public class StorageService implements I
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata_ = new TokenMetadata();
- /* This thread pool does consistency checks when the client doesn't care about consistency */
- private ExecutorService consistencyManager_ = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
- DatabaseDescriptor.getConsistencyThreads(),
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("ReadRepair"),
- "request");
-
private Set<InetAddress> replicatingNodes;
private InetAddress removingNode;
@@ -188,6 +179,7 @@ public class StorageService implements I
/* when intialized as a client, we shouldn't write to the system table. */
private boolean isClientMode;
private boolean initialized;
+ private volatile boolean joined = false;
private String operationMode;
private MigrationManager migrationManager = new MigrationManager();
@@ -353,7 +345,20 @@ public class StorageService implements I
}
}
+ if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
+ {
+ joinTokenRing();
+ }
+ else
+ {
+ logger_.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
+ }
+ }
+
+ private void joinTokenRing() throws IOException, org.apache.cassandra.config.ConfigurationException
+ {
logger_.info("Starting up server gossip");
+ joined = true;
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
@@ -425,6 +430,20 @@ public class StorageService implements I
assert tokenMetadata_.sortedTokens().size() > 0;
}
+ public synchronized void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException
+ {
+ if (!joined)
+ {
+ logger_.info("Joining ring by operator request");
+ joinTokenRing();
+ }
+ }
+
+ public boolean isJoined()
+ {
+ return joined;
+ }
+
private void setMode(String m, boolean log)
{
operationMode = m;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1062906&r1=1062905&r2=1062906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Mon Jan 24 18:13:56 2011
@@ -283,4 +283,8 @@ public interface StorageServiceMBean
public void invalidateKeyCaches(String ks, String... cfs) throws IOException;
public void invalidateRowCaches(String ks, String... cfs) throws IOException;
+
+ // allows a node that have been started without joining the ring to join it
+ public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
+ public boolean isJoined();
}