You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/23 20:03:58 UTC
svn commit: r1073884 - in /cassandra/branches/cassandra-0.7: ./
contrib/client_only/conf/ contrib/client_only/src/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/c...
Author: gdusbabek
Date: Wed Feb 23 19:03:57 2011
New Revision: 1073884
URL: http://svn.apache.org/viewvc?rev=1073884&view=rev
Log:
fat clients were creating local data. patch by gdusbabek, reviewed by tjake. CASSANDRA-2223
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml
cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Feb 23 19:03:57 2011
@@ -24,6 +24,7 @@
* add validateSchemaAgreement call + synchronization to schema
modification operations (CASSANDRA-2222)
* fix for reversed slice queries on large rows (CASSANDRA-2212)
+ * fat clients were writing local data (CASSANDRA-2223)
0.7.2
Modified: cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml Wed Feb 23 19:03:57 2011
@@ -34,6 +34,8 @@ hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, hints will be dropped.
max_hint_window_in_ms: 3600000 # one hour
+# Sleep this long after delivering each row or row fragment
+hinted_handoff_throttle_delay_in_ms: 50
# authentication backend, implementing IAuthenticator; used to identify users
authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
@@ -90,6 +92,31 @@ commitlog_sync: periodic
# milliseconds.
commitlog_sync_period_in_ms: 10000
+# emergency pressure valve: each time heap usage after a full (CMS)
+# garbage collection is above this fraction of the max, Cassandra will
+# flush the largest memtables.
+#
+# Set to 1.0 to disable. Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+#
+# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
+# it is most effective under light to moderate load, or read-heavy
+# workloads; under truly massive write load, it will often be too
+# little, too late.
+flush_largest_memtables_at: 0.75
+
+# emergency pressure valve #2: the first time heap usage after a full
+# (CMS) garbage collection is above this fraction of the max,
+# Cassandra will reduce cache maximum _capacity_ to the given fraction
+# of the current _size_. Should usually be set substantially above
+# flush_largest_memtables_at, since that will have less long-term
+# impact on the system.
+#
+# Set to 1.0 to disable. Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+reduce_cache_sizes_at: 0.85
+reduce_cache_capacity_to: 0.6
+
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
@@ -199,6 +226,11 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable. Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
# Time to wait for a reply from other nodes before failing the command
rpc_timeout_in_ms: 10000
Modified: cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java Wed Feb 23 19:03:57 2011
@@ -43,8 +43,8 @@ public class ClientOnlyExample
private static final String KEYSPACE = "Keyspace1";
private static final String COLUMN_FAMILY = "Standard1";
-
- private static void testWriting() throws Exception
+
+ private static void startClient() throws Exception
{
StorageService.instance.initClient();
// sleep for a bit so that gossip can do its thing.
@@ -56,7 +56,10 @@ public class ClientOnlyExample
{
throw new AssertionError(ex);
}
+ }
+ private static void testWriting() throws Exception
+ {
// do some writing.
for (int i = 0; i < 100; i++)
{
@@ -72,22 +75,10 @@ public class ClientOnlyExample
System.out.println("wrote key" + i);
}
System.out.println("Done writing.");
- StorageService.instance.stopClient();
}
private static void testReading() throws Exception
{
- StorageService.instance.initClient();
- // sleep for a bit so that gossip can do its thing.
- try
- {
- Thread.sleep(10000L);
- }
- catch (Exception ex)
- {
- throw new AssertionError(ex);
- }
-
// do some queries.
Collection<ByteBuffer> cols = new ArrayList<ByteBuffer>()
{{
@@ -114,11 +105,6 @@ public class ClientOnlyExample
else
System.err.println("This output indicates that nothing was read.");
}
-
- // no need to do this:
- // StorageService.instance().decommission();
- // do this instead:
- StorageService.instance.stopClient();
}
/**
@@ -137,17 +123,26 @@ public class ClientOnlyExample
*/
public static void main(String args[]) throws Exception
{
- if (args.length == 0)
- System.out.println("run with \"read\" or \"write\".");
- else if ("read".equalsIgnoreCase(args[0]))
+ startClient();
+ setupKeyspace(createConnection());
+ testWriting();
+ logger.info("Writing is done. Sleeping, then will try to read.");
+ try
{
- testReading();
+ Thread.currentThread().sleep(10000);
}
- else if ("write".equalsIgnoreCase(args[0]))
+ catch (InterruptedException ex)
{
- setupKeyspace(createConnection());
- testWriting();
+ throw new RuntimeException(ex);
}
+
+ testReading();
+
+ // no need to do this:
+ // StorageService.instance().decommission();
+ // do this instead:
+ StorageService.instance.stopClient();
+ System.exit(0); // the only way to really stop the process.
}
/**
@@ -159,15 +154,22 @@ public class ClientOnlyExample
CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
cfDefList.add(columnFamily);
- client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
- int magnitude = client.describe_ring(KEYSPACE).size();
- try
+ try
{
- Thread.sleep(1000 * magnitude);
+ client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
+ int magnitude = client.describe_ring(KEYSPACE).size();
+ try
+ {
+ Thread.sleep(1000 * magnitude);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (InterruptedException e)
+ catch (InvalidRequestException probablyExists)
{
- throw new RuntimeException(e);
+ logger.warn("Problem creating keyspace: " + probablyExists.getMessage());
}
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Feb 23 19:03:57 2011
@@ -112,7 +112,7 @@ public class DatabaseDescriptor
return url;
}
-
+
static
{
try
@@ -362,10 +362,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("saved_caches_directory missing");
}
- /* threshold after which commit log should be rotated. */
- if (conf.commitlog_rotation_threshold_in_mb != null)
- CommitLog.setSegmentSize(conf.commitlog_rotation_threshold_in_mb * 1024 * 1024);
-
// Hardcoded system tables
KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
LocalStrategy.class,
@@ -902,6 +898,14 @@ public class DatabaseDescriptor
currentIndex = (currentIndex + 1) % conf.data_file_directories.length;
return dataFileDirectory;
}
+
+ /* threshold after which commit log should be rotated. */
+ public static int getCommitLogSegmentSize()
+ {
+ return conf.commitlog_rotation_threshold_in_mb != null ?
+ conf.commitlog_rotation_threshold_in_mb * 1024 * 1024 :
+ 128*1024*1024;
+ }
public static String getCommitLogLocation()
{
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb 23 19:03:57 2011
@@ -256,6 +256,15 @@ public class HintedHandOffManager implem
{
Gossiper gossiper = Gossiper.instance;
int waited = 0;
+ // first, wait for schema to be gossiped.
+ while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
+ Thread.sleep(1000);
+ waited += 1000;
+ if (waited > 2 * StorageService.RING_DELAY)
+ throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+ }
+ waited = 0;
+ // then wait for the correct schema version.
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Wed Feb 23 19:03:57 2011
@@ -68,15 +68,18 @@ public class Table
// It is possible to call Table.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
- static
+ static
{
- try
- {
- DatabaseDescriptor.createAllDirectories();
- }
- catch (IOException ex)
+ if (!StorageService.instance.isClientMode())
{
- throw new IOError(ex);
+ try
+ {
+ DatabaseDescriptor.createAllDirectories();
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
}
}
@@ -231,7 +234,8 @@ public class Table
try
{
String keyspaceDir = dataDir + File.separator + table;
- FileUtils.createDirectory(keyspaceDir);
+ if (!StorageService.instance.isClientMode())
+ FileUtils.createDirectory(keyspaceDir);
// remove the deprecated streaming directory.
File streamingDir = new File(keyspaceDir, "stream");
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Feb 23 19:03:57 2011
@@ -78,20 +78,16 @@ import org.apache.cassandra.utils.Wrappe
public class CommitLog
{
private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
- private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
-
+
static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
public static final CommitLog instance = new CommitLog();
private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
- public static void setSegmentSize(int size)
- {
- SEGMENT_SIZE = size;
- }
-
private final ICommitLogExecutorService executor;
+
+ private volatile int segmentSize = 128*1024*1024; // roll after log gets this big
/**
* param @ table - name of table for which we are maintaining
@@ -104,6 +100,7 @@ public class CommitLog
try
{
DatabaseDescriptor.createAllDirectories();
+ segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
}
catch (IOException e)
{
@@ -478,7 +475,7 @@ public class CommitLog
{
currentSegment().write(rowMutation, serializedRow);
// roll log if necessary
- if (currentSegment().length() >= SEGMENT_SIZE)
+ if (currentSegment().length() >= segmentSize)
{
sync();
segments.add(new CommitLogSegment());
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java Wed Feb 23 19:03:57 2011
@@ -82,13 +82,16 @@ public abstract class Migration
protected transient boolean clientMode;
/** Subclasses must have a matching constructor */
- protected Migration() { /* pass */ }
+ protected Migration()
+ {
+ clientMode = StorageService.instance.isClientMode();
+ }
Migration(UUID newVersion, UUID lastVersion)
{
+ this();
this.newVersion = newVersion;
this.lastVersion = lastVersion;
- clientMode = StorageService.instance.isClientMode();
}
// block compactions and flushing.
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Wed Feb 23 19:03:57 2011
@@ -921,7 +921,6 @@ public class Gossiper implements IFailur
public void addLocalApplicationState(ApplicationState state, VersionedValue value)
{
- assert !StorageService.instance.isClientMode();
EndpointState epState = endpointStateMap_.get(localEndpoint_);
assert epState != null;
epState.addApplicationState(state, value);
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Wed Feb 23 19:03:57 2011
@@ -71,7 +71,10 @@ public class MigrationManager implements
public void onRemove(InetAddress endpoint) { }
- /** will either push or pull an updating depending on who is behind. */
+ /**
+ * will either push or pull an updating depending on who is behind.
+ * fat clients should never push their schemas (since they have no local storage).
+ */
public static void rectify(UUID theirVersion, InetAddress endpoint)
{
UUID myVersion = DatabaseDescriptor.getDefsVersion();
@@ -82,7 +85,7 @@ public class MigrationManager implements
logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString());
announce(myVersion, Collections.singleton(endpoint));
}
- else
+ else if (!StorageService.instance.isClientMode())
{
logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString());
pushMigrations(theirVersion, myVersion, endpoint);
@@ -101,8 +104,7 @@ public class MigrationManager implements
/** announce my version passively over gossip **/
public static void passiveAnnounce(UUID version)
{
- if (!StorageService.instance.isClientMode())
- Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
logger.debug("Announcing my schema is " + version);
}