You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/23 20:03:58 UTC

svn commit: r1073884 - in /cassandra/branches/cassandra-0.7: ./ contrib/client_only/conf/ contrib/client_only/src/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/c...

Author: gdusbabek
Date: Wed Feb 23 19:03:57 2011
New Revision: 1073884

URL: http://svn.apache.org/viewvc?rev=1073884&view=rev
Log:
fat clients were creating local data. patch by gdusbabek, reviewed by tjake. CASSANDRA-2223

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml
    cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Feb 23 19:03:57 2011
@@ -24,6 +24,7 @@
  * add validateSchemaAgreement call + synchronization to schema
    modification operations (CASSANDRA-2222)
  * fix for reversed slice queries on large rows (CASSANDRA-2212)
+ * fat clients were writing local data (CASSANDRA-2223)
 
 
 0.7.2

Modified: cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/contrib/client_only/conf/cassandra.yaml Wed Feb 23 19:03:57 2011
@@ -34,6 +34,8 @@ hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, hints will be dropped.
 max_hint_window_in_ms: 3600000 # one hour
+# Sleep this long after delivering each row or row fragment
+hinted_handoff_throttle_delay_in_ms: 50
 
 # authentication backend, implementing IAuthenticator; used to identify users
 authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
@@ -90,6 +92,31 @@ commitlog_sync: periodic
 # milliseconds.
 commitlog_sync_period_in_ms: 10000
 
+# emergency pressure valve: each time heap usage after a full (CMS)
+# garbage collection is above this fraction of the max, Cassandra will
+# flush the largest memtables.  
+#
+# Set to 1.0 to disable.  Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+#
+# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY:
+# it is most effective under light to moderate load, or read-heavy
+# workloads; under truly massive write load, it will often be too
+# little, too late.
+flush_largest_memtables_at: 0.75
+
+# emergency pressure valve #2: the first time heap usage after a full
+# (CMS) garbage collection is above this fraction of the max,
+# Cassandra will reduce cache maximum _capacity_ to the given fraction
+# of the current _size_.  Should usually be set substantially above
+# flush_largest_memtables_at, since that will have less long-term
+# impact on the system.  
+# 
+# Set to 1.0 to disable.  Setting this lower than
+# CMSInitiatingOccupancyFraction is not likely to be useful.
+reduce_cache_sizes_at: 0.85
+reduce_cache_capacity_to: 0.6
+
 # Addresses of hosts that are deemed contact points.
 # Cassandra nodes use this list of hosts to find each other and learn
 # the topology of the ring.  You must change this if you are running
@@ -199,6 +226,11 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable.  Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
 # Time to wait for a reply from other nodes before failing the command 
 rpc_timeout_in_ms: 10000
 

Modified: cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/client_only/src/ClientOnlyExample.java Wed Feb 23 19:03:57 2011
@@ -43,8 +43,8 @@ public class ClientOnlyExample
 
     private static final String KEYSPACE = "Keyspace1";
     private static final String COLUMN_FAMILY = "Standard1";
-
-    private static void testWriting() throws Exception
+    
+    private static void startClient() throws Exception
     {
         StorageService.instance.initClient();
         // sleep for a bit so that gossip can do its thing.
@@ -56,7 +56,10 @@ public class ClientOnlyExample
         {
             throw new AssertionError(ex);
         }
+    }
 
+    private static void testWriting() throws Exception
+    {
         // do some writing.
         for (int i = 0; i < 100; i++)
         {
@@ -72,22 +75,10 @@ public class ClientOnlyExample
             System.out.println("wrote key" + i);
         }
         System.out.println("Done writing.");
-        StorageService.instance.stopClient();
     }
 
     private static void testReading() throws Exception
     {
-        StorageService.instance.initClient();
-        // sleep for a bit so that gossip can do its thing.
-        try
-        {
-            Thread.sleep(10000L);
-        }
-        catch (Exception ex)
-        {
-            throw new AssertionError(ex);
-        }
-
         // do some queries.
         Collection<ByteBuffer> cols = new ArrayList<ByteBuffer>()
         {{
@@ -114,11 +105,6 @@ public class ClientOnlyExample
             else
                 System.err.println("This output indicates that nothing was read.");
         }
-
-        // no need to do this:
-        // StorageService.instance().decommission();
-        // do this instead:
-        StorageService.instance.stopClient();
     }
 
     /**
@@ -137,17 +123,26 @@ public class ClientOnlyExample
      */
     public static void main(String args[]) throws Exception
     {
-        if (args.length == 0)
-            System.out.println("run with \"read\" or \"write\".");
-        else if ("read".equalsIgnoreCase(args[0]))
+        startClient();
+        setupKeyspace(createConnection());
+        testWriting();
+        logger.info("Writing is done. Sleeping, then will try to read.");
+        try
         {
-            testReading();
+            Thread.currentThread().sleep(10000);
         }
-        else if ("write".equalsIgnoreCase(args[0]))
+        catch (InterruptedException ex) 
         {
-            setupKeyspace(createConnection());
-            testWriting();
+            throw new RuntimeException(ex);
         }
+        
+        testReading();
+        
+        // no need to do this:
+        // StorageService.instance().decommission();
+        // do this instead:
+        StorageService.instance.stopClient();
+        System.exit(0); // the only way to really stop the process.
     }
     
     /**
@@ -159,15 +154,22 @@ public class ClientOnlyExample
         CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY);
         cfDefList.add(columnFamily);
 
-        client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
-        int magnitude = client.describe_ring(KEYSPACE).size();
-        try
+        try 
         {
-            Thread.sleep(1000 * magnitude);
+            client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
+            int magnitude = client.describe_ring(KEYSPACE).size();
+            try
+            {
+                Thread.sleep(1000 * magnitude);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        catch (InterruptedException e)
+        catch (InvalidRequestException probablyExists) 
         {
-            throw new RuntimeException(e);
+            logger.warn("Problem creating keyspace: " + probablyExists.getMessage());    
         }
     }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Feb 23 19:03:57 2011
@@ -112,7 +112,7 @@ public class    DatabaseDescriptor
 
         return url;
     }
-
+    
     static
     {
         try
@@ -362,10 +362,6 @@ public class    DatabaseDescriptor
                     throw new ConfigurationException("saved_caches_directory missing");
             }
 
-            /* threshold after which commit log should be rotated. */
-            if (conf.commitlog_rotation_threshold_in_mb != null)
-                CommitLog.setSegmentSize(conf.commitlog_rotation_threshold_in_mb * 1024 * 1024);
-
             // Hardcoded system tables
             KSMetaData systemMeta = new KSMetaData(Table.SYSTEM_TABLE,
                                                    LocalStrategy.class,
@@ -902,6 +898,14 @@ public class    DatabaseDescriptor
         currentIndex = (currentIndex + 1) % conf.data_file_directories.length;
         return dataFileDirectory;
     }
+    
+    /* threshold after which commit log should be rotated. */
+    public static int getCommitLogSegmentSize() 
+    {
+        return conf.commitlog_rotation_threshold_in_mb != null ?
+               conf.commitlog_rotation_threshold_in_mb * 1024 * 1024 :
+               128*1024*1024;
+    }
 
     public static String getCommitLogLocation()
     {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Feb 23 19:03:57 2011
@@ -256,6 +256,15 @@ public class HintedHandOffManager implem
     {
         Gossiper gossiper = Gossiper.instance;
         int waited = 0;
+        // first, wait for schema to be gossiped.
+        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
+            Thread.sleep(1000);
+            waited += 1000;
+            if (waited > 2 * StorageService.RING_DELAY)
+                throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+        }
+        waited = 0;
+        // then wait for the correct schema version.
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
                 gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Wed Feb 23 19:03:57 2011
@@ -68,15 +68,18 @@ public class Table
 
     // It is possible to call Table.open without a running daemon, so it makes sense to ensure
     // proper directories here as well as in CassandraDaemon.
-    static
+    static 
     {
-        try
-        {
-            DatabaseDescriptor.createAllDirectories();
-        }
-        catch (IOException ex)
+        if (!StorageService.instance.isClientMode()) 
         {
-            throw new IOError(ex);
+            try
+            {
+                DatabaseDescriptor.createAllDirectories();
+            }
+            catch (IOException ex)
+            {
+                throw new IOError(ex);
+            }
         }
     }
 
@@ -231,7 +234,8 @@ public class Table
             try
             {
                 String keyspaceDir = dataDir + File.separator + table;
-                FileUtils.createDirectory(keyspaceDir);
+                if (!StorageService.instance.isClientMode())
+                    FileUtils.createDirectory(keyspaceDir);
     
                 // remove the deprecated streaming directory.
                 File streamingDir = new File(keyspaceDir, "stream");

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Feb 23 19:03:57 2011
@@ -78,20 +78,16 @@ import org.apache.cassandra.utils.Wrappe
 public class CommitLog
 {
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
-    private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
-
+    
     static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
 
     public static final CommitLog instance = new CommitLog();
 
     private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
 
-    public static void setSegmentSize(int size)
-    {
-        SEGMENT_SIZE = size;
-    }
-
     private final ICommitLogExecutorService executor;
+    
+    private volatile int segmentSize = 128*1024*1024; // roll after log gets this big
 
     /**
      * param @ table - name of table for which we are maintaining
@@ -104,6 +100,7 @@ public class CommitLog
         try
         {
             DatabaseDescriptor.createAllDirectories();
+            segmentSize = DatabaseDescriptor.getCommitLogSegmentSize();
         }
         catch (IOException e)
         {
@@ -478,7 +475,7 @@ public class CommitLog
             {
                 currentSegment().write(rowMutation, serializedRow);
                 // roll log if necessary
-                if (currentSegment().length() >= SEGMENT_SIZE)
+                if (currentSegment().length() >= segmentSize)
                 {
                     sync();
                     segments.add(new CommitLogSegment());

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java Wed Feb 23 19:03:57 2011
@@ -82,13 +82,16 @@ public abstract class Migration
     protected transient boolean clientMode;
     
     /** Subclasses must have a matching constructor */
-    protected Migration() { /* pass */ }
+    protected Migration() 
+    {
+        clientMode = StorageService.instance.isClientMode();
+    }
 
     Migration(UUID newVersion, UUID lastVersion)
     {
+        this();
         this.newVersion = newVersion;
         this.lastVersion = lastVersion;
-        clientMode = StorageService.instance.isClientMode();
     }
     
     // block compactions and flushing.

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Wed Feb 23 19:03:57 2011
@@ -921,7 +921,6 @@ public class Gossiper implements IFailur
 
     public void addLocalApplicationState(ApplicationState state, VersionedValue value)
     {
-        assert !StorageService.instance.isClientMode();
         EndpointState epState = endpointStateMap_.get(localEndpoint_);
         assert epState != null;
         epState.addApplicationState(state, value);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1073884&r1=1073883&r2=1073884&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Wed Feb 23 19:03:57 2011
@@ -71,7 +71,10 @@ public class MigrationManager implements
 
     public void onRemove(InetAddress endpoint) { }
     
-    /** will either push or pull an updating depending on who is behind. */
+    /** 
+     * will either push or pull an updating depending on who is behind.
+     * fat clients should never push their schemas (since they have no local storage).
+     */
     public static void rectify(UUID theirVersion, InetAddress endpoint)
     {
         UUID myVersion = DatabaseDescriptor.getDefsVersion();
@@ -82,7 +85,7 @@ public class MigrationManager implements
             logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString());
             announce(myVersion, Collections.singleton(endpoint));
         }
-        else
+        else if (!StorageService.instance.isClientMode())
         {
             logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString());
             pushMigrations(theirVersion, myVersion, endpoint);
@@ -101,8 +104,7 @@ public class MigrationManager implements
     /** announce my version passively over gossip **/
     public static void passiveAnnounce(UUID version)
     {
-        if (!StorageService.instance.isClientMode())
-            Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
         logger.debug("Announcing my schema is " + version);
     }