You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/25 20:57:04 UTC

svn commit: r1063431 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/cli/CliClient.java src/java/org/apache/cassandra/cli/CliOptions.java src/java/org/apache/cassandra/cli/CliSessionState.java

Author: jbellis
Date: Tue Jan 25 19:57:03 2011
New Revision: 1063431

URL: http://svn.apache.org/viewvc?rev=1063431&view=rev
Log:
CLI attemptsto block for new schemato propagate
patch by Pavel Yaskevich; reviewed by jbellis for CASSANDRA-2044

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063431&r1=1063430&r2=1063431&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 25 19:57:03 2011
@@ -38,6 +38,8 @@
  * add single-line "--" comments to CLI (CASSANDRA-2032)
  * message serialization tests (CASSANDRA-1923)
  * switch from ivy to maven-ant-tasks (CASSANDRA-2017)
+ * CLI attempts to block for new schema to propagate (CASSANDRA-2044)
+
 
 0.7.0-final
  * fix offsets to ByteBuffer.get (CASSANDRA-1939)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java?rev=1063431&r1=1063430&r2=1063431&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java Tue Jan 25 19:57:03 2011
@@ -669,7 +669,10 @@ public class CliClient extends CliUserHe
 
         try
         {
-            sessionState.out.println(thriftClient.system_add_keyspace(updateKsDefAttributes(statement, ksDef)));
+            String mySchemaVersion = thriftClient.system_add_keyspace(updateKsDefAttributes(statement, ksDef));
+            sessionState.out.println(mySchemaVersion);
+            validateSchemaIsSettled(mySchemaVersion);
+
             keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName));
         }
         catch (InvalidRequestException e)
@@ -697,7 +700,9 @@ public class CliClient extends CliUserHe
 
         try
         {
-            sessionState.out.println(thriftClient.system_add_column_family(updateCfDefAttributes(statement, cfDef)));
+            String mySchemaVersion = thriftClient.system_add_column_family(updateCfDefAttributes(statement, cfDef));
+            sessionState.out.println(mySchemaVersion);
+            validateSchemaIsSettled(mySchemaVersion);
             keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace));
         }
         catch (InvalidRequestException e)
@@ -726,7 +731,9 @@ public class CliClient extends CliUserHe
             KsDef currentKsDef = getKSMetaData(keyspaceName);
             KsDef updatedKsDef = updateKsDefAttributes(statement, currentKsDef);
 
-            sessionState.out.println(thriftClient.system_update_keyspace(updatedKsDef));
+            String mySchemaVersion = thriftClient.system_update_keyspace(updatedKsDef);
+            validateSchemaIsSettled(mySchemaVersion);
+            sessionState.out.println(mySchemaVersion);
             keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName));
         }
         catch (InvalidRequestException e)
@@ -754,7 +761,9 @@ public class CliClient extends CliUserHe
 
         try
         {
-            sessionState.out.println(thriftClient.system_update_column_family(updateCfDefAttributes(statement, cfDef)));
+            String mySchemaVersion = thriftClient.system_update_column_family(updateCfDefAttributes(statement, cfDef));
+            sessionState.out.println(mySchemaVersion);
+            validateSchemaIsSettled(mySchemaVersion);
             keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace));
         }
         catch (InvalidRequestException e)
@@ -902,7 +911,9 @@ public class CliClient extends CliUserHe
             return;
 
         String keyspaceName = CliCompiler.getKeySpace(statement, thriftClient.describe_keyspaces());
-        sessionState.out.println(thriftClient.system_drop_keyspace(keyspaceName));
+        String version = thriftClient.system_drop_keyspace(keyspaceName);
+        sessionState.out.println(version);
+        validateSchemaIsSettled(version);
     }
 
     /**
@@ -919,7 +930,9 @@ public class CliClient extends CliUserHe
             return;
 
         String cfName = CliCompiler.getColumnFamily(statement, keyspacesMap.get(keySpace).cf_defs);
-        sessionState.out.println(thriftClient.system_drop_column_family(cfName));
+        String mySchemaVersion = thriftClient.system_drop_column_family(cfName);
+        sessionState.out.println(mySchemaVersion);
+        validateSchemaIsSettled(mySchemaVersion);
     }
 
     private void executeList(Tree statement)
@@ -1981,6 +1994,51 @@ public class CliClient extends CliUserHe
         }
     }
 
+    /** validates schema is propagated to all nodes */
+    private void validateSchemaIsSettled(String currentVersionId)
+    {
+        Map<String, List<String>> versions;
+
+        long start = System.currentTimeMillis();
+        long limit = start + sessionState.schema_mwt;
+
+        boolean inAgreement = false;
+        while (limit - start >= 0)
+        {
+            try
+            {
+                versions = thriftClient.describe_schema_versions(); // getting schema version for nodes of the ring
+            }
+            catch (Exception e)
+            {
+                sessionState.err.println((e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage());
+                continue;
+            }
+
+            boolean currentlyInAgreement = true;
+            for (String version : versions.keySet())
+            {
+                if (!version.equals(currentVersionId))
+                {
+                    currentlyInAgreement = false;
+                    break; // only one disagreement is enough
+                }
+            }
+
+            if (currentlyInAgreement)
+            {
+                inAgreement = true;
+                break; // all nodes are in agreement no need to loop
+            }
+        }
+
+        if (!inAgreement)
+        {
+            sessionState.err.printf("The schema has not settled in %d seconds and further migrations are ill-advised until it does.%n", sessionState.schema_mwt / 1000);
+            System.exit(-1);
+        }
+    }
+
     private static class CfDefNamesComparator implements Comparator<CfDef>
     {
         public int compare(CfDef a, CfDef b)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java?rev=1063431&r1=1063430&r2=1063431&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java Tue Jan 25 19:57:03 2011
@@ -44,6 +44,7 @@ public class CliOptions
     private static final String FILE_OPTION = "file";
     private static final String JMX_PORT_OPTION = "jmxport";
     private static final String VERBOSE_OPTION  = "verbose";
+    private static final String SCHEMA_MIGRATION_WAIT_TIME = "schema-mwt";
 
     // Default values for optional command line arguments
     private static final int    DEFAULT_THRIFT_PORT = 9160;
@@ -59,15 +60,16 @@ public class CliOptions
         options.addOption("u",  USERNAME_OPTION, "USERNAME", "user name for cassandra authentication");
         options.addOption("pw", PASSWORD_OPTION, "PASSWORD", "password for cassandra authentication");
         options.addOption("k",  KEYSPACE_OPTION, "KEYSPACE", "cassandra keyspace user is authenticated against");
-        options.addOption("f",  FILE_OPTION,     "FILENAME", "load statements from the specific file.");
+        options.addOption("f",  FILE_OPTION,     "FILENAME", "load statements from the specific file");
         options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port");
+        options.addOption(null, SCHEMA_MIGRATION_WAIT_TIME,  "TIME", "Schema migration wait time (secs.), default is 10 secs");
 
         // options without argument
         options.addOption("B",  BATCH_OPTION,   "enabled batch mode (suppress output; errors are fatal)");
         options.addOption(null, UNFRAME_OPTION, "use cassandra server's unframed transport");
         options.addOption(null, DEBUG_OPTION,   "display stack traces");
-        options.addOption("?",  HELP_OPTION,    "usage help.");
-        options.addOption("v",  VERBOSE_OPTION, "verbose output when using batch mode.");
+        options.addOption("?",  HELP_OPTION,    "usage help");
+        options.addOption("v",  VERBOSE_OPTION, "verbose output when using batch mode");
     }
 
     private static void printUsage()
@@ -160,6 +162,15 @@ public class CliOptions
                 css.verbose = true;
             }
 
+            if (cmd.hasOption(SCHEMA_MIGRATION_WAIT_TIME))
+            {
+                css.schema_mwt = Integer.parseInt(cmd.getOptionValue(SCHEMA_MIGRATION_WAIT_TIME)) * 1000;
+            }
+            else
+            {
+                css.schema_mwt = 10 * 1000;
+            }
+
             // Abort if there are any unrecognized arguments left
             if (cmd.getArgs().length > 0)
             {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java?rev=1063431&r1=1063430&r2=1063431&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java Tue Jan 25 19:57:03 2011
@@ -40,6 +40,7 @@ public class CliSessionState
     public String  filename = ""; // file to read commands from
     public int     jmxPort = 8080;// JMX service port
     public boolean verbose = false; // verbose output
+    public int     schema_mwt;    // Schema migration wait time (secs.)
     /*
      * Streams to read/write from
      */
@@ -82,5 +83,4 @@ public class CliSessionState
 
         return null;
     }
-
 }