You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Tharindu Mathew <mc...@gmail.com> on 2011/08/02 20:04:55 UTC

Problem with Schema change propagation time

I ran across a problem, when trying to execute the following code through
Hector.

private boolean createCF(String CFName) {
        BasicColumnFamilyDefinition columnFamilyDefinition = new
BasicColumnFamilyDefinition();
        columnFamilyDefinition.setColumnType(ColumnType.STANDARD);
        columnFamilyDefinition.setName(CFName);

columnFamilyDefinition.setKeyspaceName(ReceiverConstants.BAM_KEYSPACE);

        synchronized (this) {
            boolean cfDefFound = false;
            for (ColumnFamilyDefinition cfDef :
bamKeyspaceDefinition.getCfDefs()) {
                log.info("CF found : " + cfDef.getName());
                if (cfDef.getName().equals(CFName)) {
                    cfDefFound = true;
                    break;
                }
            }
            // Column Family not found, so create it
            if (!cfDefFound) {
                ThriftCfDef cfDef = new ThriftCfDef(columnFamilyDefinition);
                cluster.addColumnFamily(cfDef);
            }
            cfList.add(CFName);
        }
        return true;
    }

Even though the code block is synchronized, during a load other thread that
enter this block still does not see the CF has been added and results in [1]

A workaround would be going for a thread sleep. But that seems too hacky for
me :(. Is there a way to properly fix this? Maybe a blocking method exists
that doesn't return until this schema change is propagated through all
nodes.

BTW, I only have one node runnning.

Any help would be greatly appreciated.

-- 
Regards,

Tharindu


[1] -

[2011-08-02 22:46:44,892]  INFO
{me.prettyprint.cassandra.hector.TimingLogger} -  start[1312305404883]
time[9] tag[META_WRITE.fail_]
me.prettyprint.hector.api.exceptions.HInvalidRequestException:
InvalidRequestException(why:CF is already defined in that keyspace.)
at
me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:42)
at
me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:68)
at
me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:62)
at
me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
at
me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:156)
at
me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluster.java:72)
at
org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataStore.java:168)
at
org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDataStore.java:124)
at
org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(PersistenceManager.java:55)
at
org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:69)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: InvalidRequestException(why:CF is already defined in that
keyspace.)
at
org.apache.cassandra.thrift.Cassandra$system_add_column_family_result.read(Cassandra.java:23375)
at
org.apache.cassandra.thrift.Cassandra$Client.recv_system_add_column_family(Cassandra.java:1333)
at
org.apache.cassandra.thrift.Cassandra$Client.system_add_column_family(Cassandra.java:1308)
at
me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:66)

Re: Problem with Schema change propagation time

Posted by Tharindu Mathew <mc...@gmail.com>.
Thanks Aaron.

I'll take a look at that. I'll refer this to the hector list as well, as I'm
interested in an OOTB solution from the Hector API itself.

On Wed, Aug 3, 2011 at 4:05 AM, Aaron Morton <aa...@thelastpickle.com>wrote:

> You want to run the thrift function describe_schema_versions it will tell
> you which nodes are on which schemas, and wait until there is a single
> version.
>
> From memory this is what the Cassandra Cli does, have a look in the code
> base in the o.a.c.cli package.
>
> Cheers
>
>
>
> -----------------
> Aaron Morton
> Freelance Cassandra Developer
> @aaronmorton
> http://www.thelastpickle.com
>
> On 3/08/2011, at 6:04 AM, Tharindu Mathew <mc...@gmail.com> wrote:
>
> I ran across a problem, when trying to execute the following code through
> Hector.
>
> private boolean createCF(String CFName) {
>         BasicColumnFamilyDefinition columnFamilyDefinition = new
> BasicColumnFamilyDefinition();
>         columnFamilyDefinition.setColumnType(ColumnType.STANDARD);
>         columnFamilyDefinition.setName(CFName);
>
> columnFamilyDefinition.setKeyspaceName(ReceiverConstants.BAM_KEYSPACE);
>
>         synchronized (this) {
>             boolean cfDefFound = false;
>             for (ColumnFamilyDefinition cfDef :
> bamKeyspaceDefinition.getCfDefs()) {
>                 log.info("CF found : " + cfDef.getName());
>                 if (cfDef.getName().equals(CFName)) {
>                     cfDefFound = true;
>                     break;
>                 }
>             }
>             // Column Family not found, so create it
>             if (!cfDefFound) {
>                 ThriftCfDef cfDef = new
> ThriftCfDef(columnFamilyDefinition);
>                 cluster.addColumnFamily(cfDef);
>             }
>             cfList.add(CFName);
>         }
>         return true;
>     }
>
> Even though the code block is synchronized, during a load other thread that
> enter this block still does not see the CF has been added and results in [1]
>
> A workaround would be going for a thread sleep. But that seems too hacky
> for me :(. Is there a way to properly fix this? Maybe a blocking method
> exists that doesn't return until this schema change is propagated through
> all nodes.
>
> BTW, I only have one node runnning.
>
> Any help would be greatly appreciated.
>
> --
> Regards,
>
> Tharindu
>
>
> [1] -
>
> [2011-08-02 22:46:44,892]  INFO
> {me.prettyprint.cassandra.hector.TimingLogger} -  start[1312305404883]
> time[9] tag[META_WRITE.fail_]
> me.prettyprint.hector.api.exceptions.HInvalidRequestException:
> InvalidRequestException(why:CF is already defined in that keyspace.)
> at
> me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:42)
>  at
> me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:68)
> at
> me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:62)
>  at
> me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
> at
> me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:156)
>  at
> me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluster.java:72)
> at
> org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataStore.java:168)
>  at
> org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDataStore.java:124)
> at
> org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(PersistenceManager.java:55)
>  at
> org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:69)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>  at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>  at java.lang.Thread.run(Thread.java:619)
> Caused by: InvalidRequestException(why:CF is already defined in that
> keyspace.)
> at
> org.apache.cassandra.thrift.Cassandra$system_add_column_family_result.read(Cassandra.java:23375)
>  at
> org.apache.cassandra.thrift.Cassandra$Client.recv_system_add_column_family(Cassandra.java:1333)
> at
> org.apache.cassandra.thrift.Cassandra$Client.system_add_column_family(Cassandra.java:1308)
>  at
> me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:66)
>
>


-- 
Regards,

Tharindu

Re: Problem with Schema change propagation time

Posted by Aaron Morton <aa...@thelastpickle.com>.
You want to run the thrift function describe_schema_versions it will tell you which nodes are on which schemas, and wait until there is a single version.

From memory this is what the Cassandra Cli does, have a look in the code base in the o.a.c.cli package.

Cheers



-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 3/08/2011, at 6:04 AM, Tharindu Mathew <mc...@gmail.com> wrote:

> I ran across a problem, when trying to execute the following code through Hector.
> 
> private boolean createCF(String CFName) {
>         BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
>         columnFamilyDefinition.setColumnType(ColumnType.STANDARD);
>         columnFamilyDefinition.setName(CFName);
>         columnFamilyDefinition.setKeyspaceName(ReceiverConstants.BAM_KEYSPACE);
> 
>         synchronized (this) {
>             boolean cfDefFound = false;
>             for (ColumnFamilyDefinition cfDef : bamKeyspaceDefinition.getCfDefs()) {
>                 log.info("CF found : " + cfDef.getName());
>                 if (cfDef.getName().equals(CFName)) {
>                     cfDefFound = true;
>                     break;
>                 }
>             }
>             // Column Family not found, so create it
>             if (!cfDefFound) {
>                 ThriftCfDef cfDef = new ThriftCfDef(columnFamilyDefinition);
>                 cluster.addColumnFamily(cfDef);
>             }
>             cfList.add(CFName);
>         }
>         return true;
>     }
> 
> Even though the code block is synchronized, during a load other thread that enter this block still does not see the CF has been added and results in [1]
> 
> A workaround would be going for a thread sleep. But that seems too hacky for me :(. Is there a way to properly fix this? Maybe a blocking method exists that doesn't return until this schema change is propagated through all nodes.
> 
> BTW, I only have one node runnning.
> 
> Any help would be greatly appreciated.
> 
> -- 
> Regards,
> 
> Tharindu
> 
> 
> [1] -
> 
> [2011-08-02 22:46:44,892]  INFO {me.prettyprint.cassandra.hector.TimingLogger} -  start[1312305404883] time[9] tag[META_WRITE.fail_]
> me.prettyprint.hector.api.exceptions.HInvalidRequestException: InvalidRequestException(why:CF is already defined in that keyspace.)
> 	at me.prettyprint.cassandra.service.ExceptionsTranslatorImpl.translate(ExceptionsTranslatorImpl.java:42)
> 	at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:68)
> 	at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:62)
> 	at me.prettyprint.cassandra.service.Operation.executeAndSetResult(Operation.java:101)
> 	at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:156)
> 	at me.prettyprint.cassandra.service.ThriftCluster.addColumnFamily(ThriftCluster.java:72)
> 	at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.createCF(NoSQLDataStore.java:168)
> 	at org.wso2.carbon.bam.receiver.persistence.NoSQLDataStore.persistData(NoSQLDataStore.java:124)
> 	at org.wso2.carbon.bam.receiver.persistence.PersistenceManager.persistEvent(PersistenceManager.java:55)
> 	at org.wso2.carbon.bam.receiver.internal.QueueWorker.run(QueueWorker.java:69)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:619)
> Caused by: InvalidRequestException(why:CF is already defined in that keyspace.)
> 	at org.apache.cassandra.thrift.Cassandra$system_add_column_family_result.read(Cassandra.java:23375)
> 	at org.apache.cassandra.thrift.Cassandra$Client.recv_system_add_column_family(Cassandra.java:1333)
> 	at org.apache.cassandra.thrift.Cassandra$Client.system_add_column_family(Cassandra.java:1308)
> 	at me.prettyprint.cassandra.service.ThriftCluster$3.execute(ThriftCluster.java:66)
>