You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/23 20:32:44 UTC
svn commit: r1073896 [1/2] - in /cassandra/trunk: ./ contrib/
contrib/client_only/conf/ contrib/client_only/src/
contrib/stress/src/org/apache/cassandra/contrib/stress/
contrib/stress/src/org/apache/cassandra/contrib/stress/util/ debian/
interface/thri...
Author: gdusbabek
Date: Wed Feb 23 19:32:42 2011
New Revision: 1073896
URL: http://svn.apache.org/viewvc?rev=1073896&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/test/data/corrupt-sstables/
- copied from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/
cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Data.db
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Filter.db
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Index.db
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
cassandra/trunk/test/data/corrupt-sstables/Super5-f-2-Statistics.db
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/RowCacheTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ScrubTest.java
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
- copied unchanged from r1073884, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/utils/ByteBufferUtilTest.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/build.xml
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/client_only/conf/cassandra.yaml
cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
cassandra/trunk/debian/init
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7:1026516-1071868
+/cassandra/branches/cassandra-0.7:1026516-1073884
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb 23 19:32:42 2011
@@ -22,7 +22,21 @@
* refactor stress.py to have only one copy of the format string
used for creating row keys (CASSANDRA-2108)
* validate index names for \w+ (CASSANDRA-2196)
- * Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
+ * Fix Cassandra cli to respect timeout if schema does not settle
+ (CASSANDRA-2187)
+ * update memtable_throughput to be a long (CASSANDRA-2158)
+ * fix for compaction and cleanup writing old-format data into new-version
+ sstable (CASSANDRA-2211, -2216)
+ * add nodetool scrub (CASSANDRA-2217)
+ * fix sstable2json large-row pagination (CASSANDRA-2188)
+ * fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
+ * fix BRAF performance when seeking to EOF (CASSANDRA-2218)
+ * check for memtable flush_after_mins exceeded every 10s (CASSANDRA-2183)
+ * fix cache saving on Windows (CASSANDRA-2207)
+ * add validateSchemaAgreement call + synchronization to schema
+ modification operations (CASSANDRA-2222)
+ * fix for reversed slice queries on large rows (CASSANDRA-2212)
+ * fat clients were writing local data (CASSANDRA-2223)
* update memtable_throughput to be a long (CASSANDRA-2158)
@@ -34,6 +48,7 @@
0.7.1
+ * refactor MessageDigest creation code. (CASSANDRA-2107)
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
* check log4j configuration for changes every 10s (CASSANDRA-1525, 1907)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Feb 23 19:32:42 2011
@@ -1,3 +1,4 @@
+<<<<<<< .working
Whatever
========
@@ -24,6 +25,23 @@ JMX
- By default, JMX now listens on port 7199.
+=======
+0.7.3
+=====
+
+Upgrading
+---------
+ - 0.7.1 and 0.7.2 shipped with a bug that caused incorrect row-level
+ bloom filters to be generated when compacting sstables generated
+ with earlier versions. This would manifest in IOExceptions during
+ column name-based queries. 0.7.3 provides "nodetool scrub" to
+ rebuild sstables with correct bloom filters, with no data lost.
+ (If your cluster was never on 0.7.0 or earlier, you don't have to
+ worry about this.) Note that nodetool scrub will snapshot your
+ data files before rebuilding, just in case.
+
+
+>>>>>>> .merge-right.r1073884
0.7.1
=====
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Wed Feb 23 19:32:42 2011
@@ -579,6 +579,7 @@
<target name="test" depends="build-test" description="Execute unit tests">
<testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+ <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
</testmacro>
<testmacro suitename="driverunit" inputdir="${test.src.driver}" timeout="60000">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1071868
+/cassandra/branches/cassandra-0.7/contrib:1026516-1073884
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Modified: cassandra/trunk/contrib/client_only/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/client_only/conf/cassandra.yaml?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/client_only/conf/cassandra.yaml (original)
+++ cassandra/trunk/contrib/client_only/conf/cassandra.yaml Wed Feb 23 19:32:42 2011
@@ -34,6 +34,8 @@ hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, hints will be dropped.
max_hint_window_in_ms: 3600000 # one hour
+# Sleep this long after delivering each row or row fragment
+hinted_handoff_throttle_delay_in_ms: 50
# authentication backend, implementing IAuthenticator; used to identify users
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
@@ -90,6 +92,31 @@ commitlog_sync: periodic
# milliseconds.
commitlog_sync_period_in_ms: 10000
+# emergency pressure valve: each time heap usage after a full (CMS)
+# garbage collection is above this fraction of the max, Cassandra will
+# flush the largest memtables.
+#
+# Set to 1.0 to disable. Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+#
+# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
+# it is most effective under light to moderate load, or read-heavy
+# workloads; under truly massive write load, it will often be too
+# little, too late.
+flush_largest_memtables_at: 0.75
+
+# emergency pressure valve #2: the first time heap usage after a full
+# (CMS) garbage collection is above this fraction of the max,
+# Cassandra will reduce cache maximum _capacity_ to the given fraction
+# of the current _size_. Should usually be set substantially above
+# flush_largest_memtables_at, since that will have less long-term
+# impact on the system.
+#
+# Set to 1.0 to disable. Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+reduce_cache_sizes_at: 0.85
+reduce_cache_capacity_to: 0.6
+
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
@@ -199,6 +226,11 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable. Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
Modified: cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java (original)
+++ cassandra/trunk/contrib/client_only/src/ClientOnlyExample.java Wed Feb 23 19:32:42 2011
@@ -43,8 +43,8 @@ public class ClientOnlyExample
private static final String KEYSPACE = "Keyspace1";
private static final String COLUMN_FAMILY = "Standard1";
-
- private static void testWriting() throws Exception
+
+ private static void startClient() throws Exception
{
StorageService.instance.initClient();
// sleep for a bit so that gossip can do its thing.
@@ -56,7 +56,10 @@ public class ClientOnlyExample
{
throw new AssertionError(ex);
}
+ }
+ private static void testWriting() throws Exception
+ {
// do some writing.
for (int i = 0; i < 100; i++)
{
@@ -72,22 +75,10 @@ public class ClientOnlyExample
System.out.println("wrote key" + i);
}
System.out.println("Done writing.");
- StorageService.instance.stopClient();
}
private static void testReading() throws Exception
{
- StorageService.instance.initClient();
- // sleep for a bit so that gossip can do its thing.
- try
- {
- Thread.sleep(10000L);
- }
- catch (Exception ex)
- {
- throw new AssertionError(ex);
- }
-
// do some queries.
Collection<ByteBuffer> cols = new ArrayList<ByteBuffer>()
{{
@@ -114,11 +105,6 @@ public class ClientOnlyExample
else
System.err.println("This output indicates that nothing was read.");
}
-
- // no need to do this:
- // StorageService.instance().decommission();
- // do this instead:
- StorageService.instance.stopClient();
}
/**
@@ -137,17 +123,26 @@ public class ClientOnlyExample
*/
public static void main(String args[]) throws Exception
{
- if (args.length == 0)
- System.out.println("run with \"read\" or \"write\".");
- else if ("read".equalsIgnoreCase(args[0]))
+ startClient();
+ setupKeyspace(createConnection());
+ testWriting();
+ logger.info("Writing is done. Sleeping, then will try to read.");
+ try
{
- testReading();
+ Thread.currentThread().sleep(10000);
}
- else if ("write".equalsIgnoreCase(args[0]))
+ catch (InterruptedException ex)
{
- setupKeyspace(createConnection());
- testWriting();
+ throw new RuntimeException(ex);
}
+
+ testReading();
+
+ // no need to do this:
+ // StorageService.instance().decommission();
+ // do this instead:
+ StorageService.instance.stopClient();
+ System.exit(0); // the only way to really stop the process.
}
/**
@@ -159,15 +154,22 @@ public class ClientOnlyExample
CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
cfDefList.add(columnFamily);
- client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
- int magnitude = client.describe_ring(KEYSPACE).size();
- try
+ try
{
- Thread.sleep(1000 * magnitude);
+ client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
+ int magnitude = client.describe_ring(KEYSPACE).size();
+ try
+ {
+ Thread.sleep(1000 * magnitude);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (InterruptedException e)
+ catch (InvalidRequestException probablyExists)
{
- throw new RuntimeException(e);
+ logger.warn("Problem creating keyspace: " + probablyExists.getMessage());
}
}
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Wed Feb 23 19:32:42 2011
@@ -162,7 +162,7 @@ public class Session
STDev = Float.parseFloat(cmd.getOptionValue("s"));
if (cmd.hasOption("r"))
- random = Boolean.parseBoolean(cmd.getOptionValue("r"));
+ random = true;
if (cmd.hasOption("f"))
{
Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java Wed Feb 23 19:32:42 2011
@@ -17,18 +17,18 @@
*/
package org.apache.cassandra.contrib.stress.util;
-import org.apache.cassandra.contrib.stress.Session;
-import org.apache.cassandra.contrib.stress.Stress;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.InvalidRequestException;
-
import java.math.BigInteger;
import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import org.apache.cassandra.contrib.stress.Session;
+import org.apache.cassandra.contrib.stress.Stress;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
+
public abstract class OperationThread extends Thread
{
public final int index;
@@ -125,7 +125,9 @@ public abstract class OperationThread ex
private double nextGaussian(int mu, float sigma)
{
Random random = Stress.randomizer;
+
Double currentState = nextGaussian;
+ nextGaussian = null;
if (currentState == null)
{
@@ -146,21 +148,14 @@ public abstract class OperationThread ex
*/
private String getMD5(String input)
{
- try
- {
- MessageDigest md = MessageDigest.getInstance("MD5");
- byte[] messageDigest = md.digest(input.getBytes());
- StringBuilder hash = new StringBuilder(new BigInteger(1, messageDigest).toString(16));
+ MessageDigest md = FBUtilities.threadLocalMD5Digest();
+ byte[] messageDigest = md.digest(input.getBytes());
+ StringBuilder hash = new StringBuilder(new BigInteger(1, messageDigest).toString(16));
- while (hash.length() < 32)
- hash.append("0").append(hash);
+ while (hash.length() < 32)
+ hash.append("0").append(hash);
- return hash.toString();
- }
- catch (NoSuchAlgorithmException e)
- {
- throw new RuntimeException(e);
- }
+ return hash.toString();
}
/**
Modified: cassandra/trunk/debian/init
URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/init?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/debian/init (original)
+++ cassandra/trunk/debian/init Wed Feb 23 19:32:42 2011
@@ -119,6 +119,8 @@ do_start()
# 2 if daemon could not be started
is_running && return 1
+ ulimit -l unlimited
+
cassandra_home=`getent passwd cassandra | awk -F ':' '{ print $6; }'`
cd / # jsvc doesn't chdir() for us
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1073884
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1073884
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1073884
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1073884
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 23 19:32:42 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071868
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1073884
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/auth/SimpleAuthenticator.java Wed Feb 23 19:32:42 2011
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Properties;
@@ -111,16 +110,12 @@ public class SimpleAuthenticator impleme
authenticated = password.equals(props.getProperty(username));
break;
case MD5:
- authenticated = MessageDigest.isEqual(MessageDigest.getInstance("MD5").digest(password.getBytes()), FBUtilities.hexToBytes(props.getProperty(username)));
+ authenticated = MessageDigest.isEqual(FBUtilities.threadLocalMD5Digest().digest(password.getBytes()), FBUtilities.hexToBytes(props.getProperty(username)));
break;
default:
throw new RuntimeException("Unknown PasswordMode " + mode);
}
}
- catch (NoSuchAlgorithmException e)
- {
- throw new RuntimeException("You requested MD5 checking but the MD5 digest algorithm is not available: " + e.getMessage());
- }
catch (IOException e)
{
throw new RuntimeException("Authentication table file given by property " + PASSWD_FILENAME_PROPERTY + " could not be opened: " + e.getMessage());
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Wed Feb 23 19:32:42 2011
@@ -25,6 +25,7 @@ import java.nio.charset.CharacterCodingE
import java.util.*;
import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
import org.antlr.runtime.tree.Tree;
import org.apache.cassandra.auth.SimpleAuthenticator;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -753,8 +755,8 @@ public class CliClient extends CliUserHe
KsDef updatedKsDef = updateKsDefAttributes(statement, currentKsDef);
String mySchemaVersion = thriftClient.system_update_keyspace(updatedKsDef);
- validateSchemaIsSettled(mySchemaVersion);
sessionState.out.println(mySchemaVersion);
+ validateSchemaIsSettled(mySchemaVersion);
keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName));
}
catch (InvalidRequestException e)
@@ -2087,13 +2089,13 @@ public class CliClient extends CliUserHe
/** validates schema is propagated to all nodes */
private void validateSchemaIsSettled(String currentVersionId)
{
- Map<String, List<String>> versions;
-
- long start = System.currentTimeMillis();
- long limit = start + sessionState.schema_mwt;
+ sessionState.out.println("Waiting for schema agreement...");
+ Map<String, List<String>> versions = null;
+ long limit = System.currentTimeMillis() + sessionState.schema_mwt;
boolean inAgreement = false;
- while (limit - start >= 0)
+ outer:
+ while (limit - System.currentTimeMillis() >= 0 && !inAgreement)
{
try
{
@@ -2105,29 +2107,23 @@ public class CliClient extends CliUserHe
continue;
}
- boolean currentlyInAgreement = true;
for (String version : versions.keySet())
{
- if (!version.equals(currentVersionId))
- {
- currentlyInAgreement = false;
- break; // only one disagreement is enough
- }
- }
-
- if (currentlyInAgreement)
- {
- inAgreement = true;
- break; // all nodes are in agreement no need to loop
+ if (!version.equals(currentVersionId) && !version.equals(StorageProxy.UNREACHABLE))
+ continue outer;
}
- start = System.currentTimeMillis();
+ inAgreement = true;
}
+ if (versions.containsKey(StorageProxy.UNREACHABLE))
+ sessionState.err.printf("Warning: unreachable nodes %s", Joiner.on(", ").join(versions.get(StorageProxy.UNREACHABLE)));
if (!inAgreement)
{
- sessionState.err.printf("The schema has not settled in %d seconds and further migrations are ill-advised until it does.%n", sessionState.schema_mwt / 1000);
+ sessionState.err.printf("The schema has not settled in %d seconds; further migrations are ill-advised until it does.%nVersions are %s%n",
+ sessionState.schema_mwt / 1000, FBUtilities.toString(versions));
System.exit(-1);
}
+ sessionState.out.println("... schemas agree across the cluster");
}
private static class CfDefNamesComparator implements Comparator<CfDef>
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed Feb 23 19:32:42 2011
@@ -212,6 +212,7 @@ public final class CFMetaData
this.memtableThroughputInMb = memtableThroughputInMb == null
? DEFAULT_MEMTABLE_THROUGHPUT_IN_MB
: memtableThroughputInMb;
+
this.memtableOperationsInMillions = memtableOperationsInMillions == null
? DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS
: memtableOperationsInMillions;
@@ -904,28 +905,22 @@ public final class CFMetaData
public static void validateMemtableSettings(org.apache.cassandra.thrift.CfDef cf_def) throws ConfigurationException
{
- if (cf_def.isSetMemtable_flush_after_mins() && cf_def.memtable_flush_after_mins <= 0) {
- throw new ConfigurationException("memtable_flush_after_mins cannot be non-positive");
- }
- if (cf_def.isSetMemtable_throughput_in_mb() && cf_def.memtable_throughput_in_mb <= 0) {
- throw new ConfigurationException("memtable_throughput_in_mb cannot be non-positive.");
- }
- if (cf_def.isSetMemtable_operations_in_millions() && cf_def.memtable_operations_in_millions <= 0) {
- throw new ConfigurationException("memtable_operations_in_millions cannot be non-positive");
- }
+ if (cf_def.isSetMemtable_flush_after_mins())
+ DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins);
+ if (cf_def.isSetMemtable_throughput_in_mb())
+ DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb);
+ if (cf_def.isSetMemtable_operations_in_millions())
+ DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions);
}
public static void validateMemtableSettings(org.apache.cassandra.db.migration.avro.CfDef cf_def) throws ConfigurationException
{
- if (cf_def.memtable_flush_after_mins != null && cf_def.memtable_flush_after_mins <= 0) {
- throw new ConfigurationException("memtable_flush_after_mins cannot be non-positive");
- }
- if (cf_def.memtable_throughput_in_mb != null && cf_def.memtable_throughput_in_mb <= 0) {
- throw new ConfigurationException("memtable_throughput_in_mb cannot be non-positive.");
- }
- if (cf_def.memtable_operations_in_millions != null && cf_def.memtable_operations_in_millions <= 0) {
- throw new ConfigurationException("memtable_operations_in_millions cannot be non-positive");
- }
+ if (cf_def.memtable_flush_after_mins != null)
+ DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins);
+ if (cf_def.memtable_throughput_in_mb != null)
+ DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb);
+ if (cf_def.memtable_operations_in_millions != null)
+ DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions);
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Feb 23 19:32:42 2011
@@ -112,7 +112,7 @@ public class DatabaseDescriptor
return url;
}
-
+
static
{
try
@@ -369,10 +369,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("saved_caches_directory missing");
}
- /* threshold after which commit log should be rotated. */
- if (conf.commitlog_rotation_threshold_in_mb != null)
- CommitLog.setSegmentSize(conf.commitlog_rotation_threshold_in_mb * 1024 * 1024);
-
// Hardcoded system tables
KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
LocalStrategy.class,
@@ -918,6 +914,14 @@ public class DatabaseDescriptor
currentIndex = (currentIndex + 1) % conf.data_file_directories.length;
return dataFileDirectory;
}
+
+ /* threshold after which commit log should be rotated. */
+ public static int getCommitLogSegmentSize()
+ {
+ return conf.commitlog_rotation_threshold_in_mb != null ?
+ conf.commitlog_rotation_threshold_in_mb * 1024 * 1024 :
+ 128*1024*1024;
+ }
public static String getCommitLogLocation()
{
@@ -1201,4 +1205,24 @@ public class DatabaseDescriptor
{
return conf.compaction_preheat_key_cache;
}
+
+ public static void validateMemtableThroughput(int sizeInMB) throws ConfigurationException
+ {
+ if (sizeInMB <= 0)
+ throw new ConfigurationException("memtable_throughput_in_mb must be greater than 0.");
+ }
+
+ public static void validateMemtableOperations(double operationsInMillions) throws ConfigurationException
+ {
+ if (operationsInMillions <= 0)
+ throw new ConfigurationException("memtable_operations_in_millions must be greater than 0.0.");
+ if (operationsInMillions > Long.MAX_VALUE / 1024 * 1024)
+ throw new ConfigurationException("memtable_operations_in_millions must be less than " + Long.MAX_VALUE / 1024 * 1024);
+ }
+
+ public static void validateMemtableFlushPeriod(int minutes) throws ConfigurationException
+ {
+ if (minutes <= 0)
+ throw new ConfigurationException("memtable_flush_after_mins must be greater than 0.");
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@@ -373,18 +372,9 @@ public class ColumnFamily implements ICo
public static ByteBuffer digest(ColumnFamily cf)
{
- MessageDigest digest;
- try
- {
- digest = MessageDigest.getInstance("MD5");
- }
- catch (NoSuchAlgorithmException e)
- {
- throw new AssertionError(e);
- }
+ MessageDigest digest = FBUtilities.threadLocalMD5Digest();
if (cf != null)
cf.updateDigest(digest);
-
return ByteBuffer.wrap(digest.digest());
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 23 19:32:42 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.concurrent.N
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -977,6 +978,12 @@ public class ColumnFamilyStore implement
CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
+ public void scrub() throws ExecutionException, InterruptedException
+ {
+ snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
+ CompactionManager.instance.performScrub(ColumnFamilyStore.this);
+ }
+
void markCompacted(Collection<SSTableReader> sstables)
{
ssTables.markCompacted(sstables);
@@ -1021,12 +1028,12 @@ public class ColumnFamilyStore implement
flushable.flushAndSignal(latch, flushSorter, flushWriter);
}
- public int getMemtableColumnsCount()
+ public long getMemtableColumnsCount()
{
return getMemtableThreadSafe().getCurrentOperations();
}
- public int getMemtableDataSize()
+ public long getMemtableDataSize()
{
return getMemtableThreadSafe().getCurrentThroughput();
}
@@ -1668,26 +1675,8 @@ public class ColumnFamilyStore implement
return metadata.comparator;
}
- /**
- * Take a snap shot of this columnfamily store.
- *
- * @param snapshotName the name of the associated with the snapshot
- */
- public void snapshot(String snapshotName)
+ private void snapshotWithoutFlush(String snapshotName)
{
- try
- {
- forceBlockingFlush();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
for (SSTableReader ssTable : ssTables)
{
try
@@ -1710,6 +1699,30 @@ public class ColumnFamilyStore implement
}
}
+
+ /**
+ * Take a snap shot of this columnfamily store.
+ *
+ * @param snapshotName the name of the associated with the snapshot
+ */
+ public void snapshot(String snapshotName)
+ {
+ try
+ {
+ forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ snapshotWithoutFlush(snapshotName);
+ }
+
public boolean hasUnreclaimedSpace()
{
return ssTables.getLiveSize() < ssTables.getTotalSize();
@@ -2012,24 +2025,20 @@ public class ColumnFamilyStore implement
{
return memsize.value();
}
- public void setMemtableThroughputInMB(int size)
+ public void setMemtableThroughputInMB(int size) throws ConfigurationException
{
- if (size <= 0) {
- throw new RuntimeException("MemtableThroughputInMB must be greater than 0.");
- }
- this.memsize.set(size);
+ DatabaseDescriptor.validateMemtableThroughput(size);
+ memsize.set(size);
}
public double getMemtableOperationsInMillions()
{
return memops.value();
}
- public void setMemtableOperationsInMillions(double ops)
+ public void setMemtableOperationsInMillions(double ops) throws ConfigurationException
{
- if (ops <= 0) {
- throw new RuntimeException("MemtableOperationsInMillions must be greater than 0.0.");
- }
- this.memops.set(ops);
+ DatabaseDescriptor.validateMemtableOperations(ops);
+ memops.set(ops);
}
public long estimateKeys()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Feb 23 19:32:42 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.config.ConfigurationException;
+
/**
* The MBean interface for ColumnFamilyStore
*/
@@ -38,14 +40,14 @@ public interface ColumnFamilyStoreMBean
*
* @return The size in bytes.
*/
- public int getMemtableDataSize();
+ public long getMemtableDataSize();
/**
* Returns the total number of columns present in the memtable.
*
* @return The number of columns.
*/
- public int getMemtableColumnsCount();
+ public long getMemtableColumnsCount();
/**
* Returns the number of times that a flush has resulted in the
@@ -211,10 +213,10 @@ public interface ColumnFamilyStoreMBean
public void setMemtableFlushAfterMins(int time);
public int getMemtableThroughputInMB();
- public void setMemtableThroughputInMB(int size);
+ public void setMemtableThroughputInMB(int size) throws ConfigurationException;
public double getMemtableOperationsInMillions();
- public void setMemtableOperationsInMillions(double ops);
+ public void setMemtableOperationsInMillions(double ops) throws ConfigurationException;
public long estimateKeys();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Feb 23 19:32:42 2011
@@ -41,14 +41,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.AbstractCompactedRow;
-import org.apache.cassandra.io.CompactionIterator;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.*;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -120,7 +120,7 @@ public class CompactionManager implement
Collections.sort(sstables);
int gcBefore = cfs.isIndex()
? Integer.MAX_VALUE
- : (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ : getDefaultGcBefore(cfs);
return doCompaction(cfs,
sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
gcBefore);
@@ -182,9 +182,31 @@ public class CompactionManager implement
executor.submit(runnable).get();
}
+ public void performScrub(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+ {
+ Callable<Object> runnable = new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ compactionLock.lock();
+ try
+ {
+ if (!cfStore.isInvalid())
+ doScrub(cfStore);
+ return this;
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
+ }
+ };
+ executor.submit(runnable).get();
+ }
+
public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
- submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) - cfStore.metadata.getGcGraceSeconds()).get();
+ submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
}
public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
@@ -257,7 +279,7 @@ public class CompactionManager implement
}
ColumnFamilyStore cfs = Table.open(ksname).getColumnFamilyStore(cfname);
- submitUserDefined(cfs, descriptors, (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds());
+ submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
}
private Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor> dataFiles, final int gcBefore)
@@ -476,6 +498,101 @@ public class CompactionManager implement
}
/**
+ * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover
+ * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
+ * from early ByteBuffer bugs.
+ *
+ * @throws IOException
+ */
+ private void doScrub(ColumnFamilyStore cfs) throws IOException
+ {
+ assert !cfs.isIndex();
+ Table table = cfs.table;
+ Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
+
+ for (final SSTableReader sstable : cfs.getSSTables())
+ {
+ logger.info("Scrubbing " + sstable);
+
+ // Calculate the expected compacted filesize
+ String compactionFileLocation = table.getDataFileLocation(sstable.length());
+ if (compactionFileLocation == null)
+ throw new IOException("disk full");
+
+ int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
+ (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+ // loop through each row, deserializing to check for damage.
+ // we'll also loop through the index at the same time, using the position from the index to recover if the
+ // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+ // "ahead" of the data file.)
+ final BufferedRandomAccessFile dataFile = BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+ String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
+ BufferedRandomAccessFile indexFile = BufferedRandomAccessFile.getUncachingReader(indexFilename);
+ ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+ assert indexFile.readLong() == 0;
+
+ SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
+ executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+
+ while (!dataFile.isEOF())
+ {
+ long rowStart = dataFile.getFilePointer();
+ DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ ByteBuffer currentIndexKey = nextIndexKey;
+ nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ long nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+
+ long dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+ long dataStart = dataFile.getFilePointer();
+
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ writer.mark();
+ try
+ {
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+ }
+ catch (Exception e)
+ {
+ logger.warn("Error reading row " + ByteBufferUtil.bytesToHex(key.key) + "(stacktrace follows)", e);
+ writer.reset();
+
+ long dataStartFromIndex = rowStart + 2 + currentIndexKey.remaining();
+ if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+ {
+ logger.info(String.format("Retrying %s as key %s from row index",
+ ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)));
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+ long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ try
+ {
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+ }
+ catch (Exception e2)
+ {
+ logger.info("Retry failed too. Skipping to next row (retry's stacktrace follows)", e2);
+ writer.reset();
+ dataFile.seek(nextRowPositionFromIndex);
+ }
+ }
+ else
+ {
+ logger.info("Skipping to next row");
+ dataFile.seek(nextRowPositionFromIndex);
+ }
+ }
+ }
+
+ SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+ logger.info("Scrub of " + sstable + " complete");
+ }
+ }
+
+ /**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
@@ -500,7 +617,7 @@ public class CompactionManager implement
long totalkeysWritten = 0;
int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
- (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+ (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -516,7 +633,7 @@ public class CompactionManager implement
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
- writer.append(new EchoedRow(row));
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
totalkeysWritten++;
}
else
@@ -569,6 +686,21 @@ public class CompactionManager implement
}
}
+ /**
+ * @return an AbstractCompactedRow implementation to write the row in question.
+ * If the data is from a current-version sstable, write it unchanged. Otherwise,
+ * re-serialize it in the latest version.
+ */
+ private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor, boolean forceDeserialize)
+ {
+ if (descriptor.isLatestVersion && !forceDeserialize)
+ return new EchoedRow(row);
+
+ return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
+ ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize)
+ : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize);
+ }
+
private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
throws IOException
{
@@ -753,11 +885,16 @@ public class CompactionManager implement
return executor.submit(runnable);
}
+ private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+ {
+ return (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
+ }
+
private static class ValidationCompactionIterator extends CompactionIterator
{
public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
{
- super(cfs, cfs.getSSTables(), (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds(), true);
+ super(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true);
}
@Override
@@ -966,4 +1103,38 @@ public class CompactionManager implement
return "Cleanup of " + sstable.getColumnFamilyName();
}
}
+
+ private static class ScrubInfo implements ICompactionInfo
+ {
+ private final BufferedRandomAccessFile dataFile;
+ private final SSTableReader sstable;
+
+ public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
+ {
+ this.dataFile = dataFile;
+ this.sstable = sstable;
+ }
+
+ public long getTotalBytes()
+ {
+ try
+ {
+ return dataFile.length();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getBytesComplete()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public String getTaskType()
+ {
+ return "Scrub " + sstable;
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb 23 19:32:42 2011
@@ -255,6 +255,15 @@ public class HintedHandOffManager implem
{
Gossiper gossiper = Gossiper.instance;
int waited = 0;
+ // first, wait for schema to be gossiped.
+ while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
+ Thread.sleep(1000);
+ waited += 1000;
+ if (waited > 2 * StorageService.RING_DELAY)
+ throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+ }
+ waited = 0;
+ // then wait for the correct schema version.
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Feb 23 19:32:42 2011
@@ -30,12 +30,14 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
import org.apache.cassandra.db.filter.AbstractColumnIterator;
@@ -52,23 +54,23 @@ public class Memtable implements Compara
private boolean isFrozen;
- private final AtomicInteger currentThroughput = new AtomicInteger(0);
- private final AtomicInteger currentOperations = new AtomicInteger(0);
+ private final AtomicLong currentThroughput = new AtomicLong(0);
+ private final AtomicLong currentOperations = new AtomicLong(0);
private final long creationTime;
private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
public final ColumnFamilyStore cfs;
- private final int THRESHOLD;
- private final int THRESHOLD_COUNT;
+ private final long THRESHOLD;
+ private final long THRESHOLD_COUNT;
public Memtable(ColumnFamilyStore cfs)
{
this.cfs = cfs;
creationTime = System.currentTimeMillis();
- this.THRESHOLD = cfs.getMemtableThroughputInMB() * 1024 * 1024;
- this.THRESHOLD_COUNT = (int) (cfs.getMemtableOperationsInMillions() * 1024 * 1024);
+ THRESHOLD = cfs.getMemtableThroughputInMB() * 1024 * 1024;
+ THRESHOLD_COUNT = (long) (cfs.getMemtableOperationsInMillions() * 1024 * 1024);
}
/**
@@ -88,12 +90,12 @@ public class Memtable implements Compara
return 0;
}
- public int getCurrentThroughput()
+ public long getCurrentThroughput()
{
return currentThroughput.get();
}
- public int getCurrentOperations()
+ public long getCurrentOperations()
{
return currentOperations.get();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Feb 23 19:32:42 2011
@@ -189,6 +189,9 @@ public class RowMutation implements IMut
Table.open(table_).apply(this, true);
}
+ /**
+ * Apply without touching the commitlog. For testing.
+ */
public void applyUnsafe() throws IOException
{
Table.open(table_).apply(this, false);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Feb 23 19:32:42 2011
@@ -70,15 +70,18 @@ public class Table
// It is possible to call Table.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
- static
+ static
{
- try
- {
- DatabaseDescriptor.createAllDirectories();
- }
- catch (IOException ex)
+ if (!StorageService.instance.isClientMode())
{
- throw new IOError(ex);
+ try
+ {
+ DatabaseDescriptor.createAllDirectories();
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
}
}
@@ -160,36 +163,6 @@ public class Table
}
/**
- * Do a cleanup of keys that do not belong locally.
- */
- public void forceCleanup() throws IOException, ExecutionException, InterruptedException
- {
- if (name.equals(SYSTEM_TABLE))
- throw new UnsupportedOperationException("Cleanup of the system table is neither necessary nor wise");
-
- // Sort the column families in order of SSTable size, so cleanup of smaller CFs
- // can free up space for larger ones
- List<ColumnFamilyStore> sortedColumnFamilies = new ArrayList<ColumnFamilyStore>(columnFamilyStores.values());
- Collections.sort(sortedColumnFamilies, new Comparator<ColumnFamilyStore>()
- {
- // Compare first on size and, if equal, sort by name (arbitrary & deterministic).
- public int compare(ColumnFamilyStore cf1, ColumnFamilyStore cf2)
- {
- long diff = (cf1.getTotalDiskSpaceUsed() - cf2.getTotalDiskSpaceUsed());
- if (diff > 0)
- return 1;
- if (diff < 0)
- return -1;
- return cf1.columnFamily.compareTo(cf2.columnFamily);
- }
- });
-
- // Cleanup in sorted order to free up space for the larger ones
- for (ColumnFamilyStore cfs : sortedColumnFamilies)
- cfs.forceCleanup();
- }
-
- /**
* Take a snapshot of the entire set of column families with a given timestamp.
*
* @param clientSuppliedName the tag associated with the name of the snapshot. This
@@ -238,16 +211,6 @@ public class Table
}
}
- /*
- * This method is an ADMIN operation to force compaction
- * of all SSTables on disk.
- */
- public void forceCompaction() throws IOException, ExecutionException, InterruptedException
- {
- for (ColumnFamilyStore cfStore : columnFamilyStores.values())
- CompactionManager.instance.performMajor(cfStore);
- }
-
/**
* @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these).
*/
@@ -282,7 +245,8 @@ public class Table
try
{
String keyspaceDir = dataDir + File.separator + table;
- FileUtils.createDirectory(keyspaceDir);
+ if (!StorageService.instance.isClientMode())
+ FileUtils.createDirectory(keyspaceDir);
// remove the deprecated streaming directory.
File streamingDir = new File(keyspaceDir, "stream");
@@ -301,13 +265,6 @@ public class Table
initCf(cfm.cfId, cfm.cfName);
}
- // check 10x as often as the shortest lifetime, so we can exceed all lifetimes by 10% at most
- int minCheckMs = Integer.MAX_VALUE;
- for (ColumnFamilyStore cfs : columnFamilyStores.values())
- {
- minCheckMs = Math.min(minCheckMs, cfs.getMemtableFlushAfterMins() * 60 * 1000 / 10);
- }
-
Runnable runnable = new Runnable()
{
public void run()
@@ -318,7 +275,7 @@ public class Table
}
}
};
- flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, minCheckMs, minCheckMs, TimeUnit.MILLISECONDS);
+ flushTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.SECONDS);
}
public void createReplicationStrategy(KSMetaData ksm) throws ConfigurationException
@@ -370,15 +327,7 @@ public class Table
cfName, cfId, columnFamilyStores.get(cfId));
columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName));
}
-
- public void reloadCf(Integer cfId) throws IOException
- {
- ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
- assert cfs != null;
- unloadCf(cfs);
- initCf(cfId, cfs.getColumnFamilyName());
- }
-
+
/** basically a combined drop and add */
public void renameCf(Integer cfId, String newName) throws IOException
{
@@ -758,4 +707,9 @@ public class Table
cfs.truncate().get();
logger.debug("Truncation done.");
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(name='" + name + "')";
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java Wed Feb 23 19:32:42 2011
@@ -146,8 +146,6 @@ class IndexedSliceReader extends Abstrac
file.readInt(); // column count
this.mark = file.mark();
curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
- if (reversed && curRangeIndex == indexes.size())
- curRangeIndex--;
}
public boolean getNextBlock() throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java Wed Feb 23 19:32:42 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.db.ColumnFamilySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +109,13 @@ public class SSTableNamesIterator extend
// we can stop early if bloom filter says none of the columns actually exist -- but,
// we can't stop before initializing the cf above, in case there's a relevant tombstone
- cf = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+ ColumnFamilySerializer serializer = ColumnFamily.serializer();
+ try {
+ cf = serializer.deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+ } catch (Exception e) {
+ throw new IOException
+ (serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
+ }
List<ByteBuffer> filteredColumnNames = new ArrayList<ByteBuffer>(columns.size());
for (ByteBuffer name : columns)
@@ -153,7 +160,7 @@ public class SSTableNamesIterator extend
/* get the various column ranges we have to read */
AbstractType comparator = metadata.comparator;
- SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator));
+ SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator, false));
for (ByteBuffer name : filteredColumnNames)
{
int index = IndexHelper.indexFor(name, indexList, comparator, false);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Feb 23 19:32:42 2011
@@ -79,20 +79,16 @@ import org.apache.cassandra.utils.Wrappe
public class CommitLog
{
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
- private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
-
+
static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
public static final CommitLog instance = new CommitLog();
private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
- public static void setSegmentSize(int size)
- {
- SEGMENT_SIZE = size;
- }
-
private final ICommitLogExecutorService executor;
+
+ private volatile int segmentSize = 128*1024*1024; // roll after log gets this big
/**
* param @ table - name of table for which we are maintaining
@@ -105,6 +101,7 @@ public class CommitLog
try
{
DatabaseDescriptor.createAllDirectories();
+ segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
}
catch (IOException e)
{
@@ -479,7 +476,7 @@ public class CommitLog
{
currentSegment().write(rowMutation);
// roll log if necessary
- if (currentSegment().length() >= SEGMENT_SIZE)
+ if (currentSegment().length() >= segmentSize)
{
sync();
segments.add(new CommitLogSegment());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Wed Feb 23 19:32:42 2011
@@ -210,4 +210,13 @@ public class QueryFilter
{
return new QueryFilter(key, path, new NamesQueryFilter(column));
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(key=" + key +
+ ", path=" + path +
+ (filter == null ? "" : ", filter=" + filter) +
+ (superFilter == null ? "" : ", superFilter=" + superFilter) +
+ ")";
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java Wed Feb 23 19:32:42 2011
@@ -71,7 +71,7 @@ public class QueryPath
@Override
public String toString()
{
- return "QueryPath(" +
+ return getClass().getSimpleName() + "(" +
"columnFamilyName='" + columnFamilyName + '\'' +
", superColumnName='" + superColumnName + '\'' +
", columnName='" + columnName + '\'' +
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Wed Feb 23 19:32:42 2011
@@ -143,4 +143,13 @@ public class SliceQueryFilter implements
container.addColumn(column);
}
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" +
+ "start=" + start +
+ ", finish=" + finish +
+ ", reversed=" + reversed +
+ ", count=" + count + "]";
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/AddColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -1,13 +1,12 @@
package org.apache.cassandra.db.migration;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -52,7 +51,13 @@ public class AddColumnFamily extends Mig
cfm.tableName));
else if (!Migration.isLegalName(cfm.cfName))
throw new ConfigurationException("Invalid column family name: " + cfm.cfName);
-
+ for (Map.Entry<ByteBuffer, ColumnDefinition> entry : cfm.getColumn_metadata().entrySet())
+ {
+ String indexName = entry.getValue().getIndexName();
+ if (indexName != null && !Migration.isLegalName(indexName))
+ throw new ConfigurationException("Invalid index name: " + indexName);
+ }
+
// clone ksm but include the new cf def.
KSMetaData newKsm = makeNewKeyspaceDefinition(ksm);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Wed Feb 23 19:32:42 2011
@@ -83,13 +83,16 @@ public abstract class Migration
protected transient boolean clientMode;
/** Subclasses must have a matching constructor */
- protected Migration() { /* pass */ }
+ protected Migration()
+ {
+ clientMode = StorageService.instance.isClientMode();
+ }
Migration(UUID newVersion, UUID lastVersion)
{
+ this();
this.newVersion = newVersion;
this.lastVersion = lastVersion;
- clientMode = StorageService.instance.isClientMode();
}
// block compactions and flushing.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/UpdateColumnFamily.java Wed Feb 23 19:32:42 2011
@@ -1,13 +1,13 @@
package org.apache.cassandra.db.migration;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.migration.avro.ColumnDef;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@ -45,7 +45,15 @@ public class UpdateColumnFamily extends
KSMetaData ksm = DatabaseDescriptor.getTableDefinition(cf_def.keyspace.toString());
if (ksm == null)
throw new ConfigurationException("No such keyspace: " + cf_def.keyspace.toString());
-
+ if (cf_def.column_metadata != null)
+ {
+ for (ColumnDef entry : cf_def.column_metadata)
+ {
+ if (entry.index_name != null && !Migration.isLegalName((String) entry.index_name))
+ throw new ConfigurationException("Invalid index name: " + entry.index_name);
+ }
+ }
+
CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace.toString(), cf_def.name.toString()));
// create a copy of the old CF meta data. Apply new settings on top of it.
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Feb 23 19:32:42 2011
@@ -861,7 +861,6 @@ public class Gossiper implements IFailur
public void addLocalApplicationState(ApplicationState state, VersionedValue value)
{
- assert !StorageService.instance.isClientMode();
EndpointState epState = endpointStateMap.get(FBUtilities.getLocalAddress());
assert epState != null;
epState.addApplicationState(state, value);
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java Wed Feb 23 19:32:42 2011
@@ -59,7 +59,7 @@ public class ColumnFamilySplit extends I
public long getLength()
{
// only used for sorting splits. we don't have the capability, yet.
- return 0;
+ return Long.MAX_VALUE;
}
public String[] getLocations()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Feb 23 19:32:42 2011
@@ -134,9 +134,9 @@ implements Closeable, ICompactionInfo
{
logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
- return new LazilyCompactedRow(cfs, rows, major, gcBefore);
+ return new LazilyCompactedRow(cfs, rows, major, gcBefore, false);
}
- return new PrecompactedRow(cfs, rows, major, gcBefore);
+ return new PrecompactedRow(cfs, rows, major, gcBefore, false);
}
public void close() throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1073896&r1=1073895&r2=1073896&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Feb 23 19:32:42 2011
@@ -59,15 +59,17 @@ public class LazilyCompactedRow extends
private final boolean shouldPurge;
private final int gcBefore;
private final DataOutputBuffer headerBuffer;
+ private final boolean forceDeserialize;
private ColumnFamily emptyColumnFamily;
private LazyColumnIterator iter;
private int columnCount;
private long columnSerializedSize;
- public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+ public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
{
super(rows.get(0).getKey());
this.gcBefore = gcBefore;
+ this.forceDeserialize = forceDeserialize;
this.rows = new ArrayList<SSTableIdentityIterator>(rows);
Set<SSTable> sstables = new HashSet<SSTable>();
@@ -94,7 +96,7 @@ public class LazilyCompactedRow extends
public void write(DataOutput out) throws IOException
{
- if (rows.size() == 1 && !shouldPurge)
+ if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion && !forceDeserialize)
{
SSTableIdentityIterator row = rows.get(0);
out.writeLong(row.dataSize);