You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/09 03:55:03 UTC
[1/5] git commit: Fix execution of LOCAL_QUORUM queries with
SimpleStrategy
Updated Branches:
refs/heads/cassandra-2.0 f8fd7db67 -> de19f963a
refs/heads/trunk c2294aa21 -> db07b20ed
Fix execution of LOCAL_QUORUM queries with SimpleStrategy
patch by alexliu68; reviewed by slebresne for CASSANDRA-6545
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/32d7cb50
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/32d7cb50
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/32d7cb50
Branch: refs/heads/trunk
Commit: 32d7cb5066050ef6123f50a25c6e9b4c9e180ea0
Parents: 2a7c20e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:14:58 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:15:20 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ConsistencyLevel.java | 60 +++++++++++++-------
.../locator/AbstractReplicationStrategy.java | 2 +-
3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a85977..cba97d0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
+ * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
1.2.13
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4d72767..3737c73 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -88,9 +88,16 @@ public enum ConsistencyLevel
return codeIdx[code];
}
+ private int quorumFor(Table table)
+ {
+ return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ }
+
private int localQuorumFor(Table table, String dc)
{
- return (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
+ return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
+ : quorumFor(table);
}
public int blockFor(Table table)
@@ -107,17 +114,24 @@ public enum ConsistencyLevel
case THREE:
return 3;
case QUORUM:
- return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ return quorumFor(table);
case ALL:
return table.getReplicationStrategy().getReplicationFactor();
case LOCAL_QUORUM:
return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
case EACH_QUORUM:
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
- int n = 0;
- for (String dc : strategy.getDatacenters())
- n += localQuorumFor(table, dc);
- return n;
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ {
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
+ int n = 0;
+ for (String dc : strategy.getDatacenters())
+ n += localQuorumFor(table, dc);
+ return n;
+ }
+ else
+ {
+ return quorumFor(table);
+ }
default:
throw new UnsupportedOperationException("Invalid consistency level: " + toString());
}
@@ -208,16 +222,20 @@ public enum ConsistencyLevel
// local hint is acceptable, and local node is always live
return true;
case LOCAL_ONE:
- return countLocalEndpoints(liveEndpoints) >= 1;
+ return countLocalEndpoints(liveEndpoints) >= 1;
case LOCAL_QUORUM:
return countLocalEndpoints(liveEndpoints) >= blockFor(table);
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- if (entry.getValue() < localQuorumFor(table, entry.getKey()))
- return false;
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ if (entry.getValue() < localQuorumFor(table, entry.getKey()))
+ return false;
+ }
+ return true;
}
- return true;
+ // Fallthough on purpose for SimpleStrategy
default:
return Iterables.size(liveEndpoints) >= blockFor(table);
}
@@ -250,14 +268,18 @@ public enum ConsistencyLevel
}
break;
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- int dcBlockFor = localQuorumFor(table, entry.getKey());
- int dcLive = entry.getValue();
- if (dcLive < dcBlockFor)
- throw new UnavailableException(this, dcBlockFor, dcLive);
+ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
+ {
+ int dcBlockFor = localQuorumFor(table, entry.getKey());
+ int dcLive = entry.getValue();
+ if (dcLive < dcBlockFor)
+ throw new UnavailableException(this, dcBlockFor, dcLive);
+ }
+ break;
}
- break;
+ // Fallthough on purpose for SimpleStrategy
default:
int live = Iterables.size(liveEndpoints);
if (live < blockFor)
@@ -282,8 +304,6 @@ public enum ConsistencyLevel
public void validateForWrite(String table) throws InvalidRequestException
{
- if(this == EACH_QUORUM)
- requireNetworkTopologyStrategy(table);
}
public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/32d7cb50/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index a48bec9..e4dd422 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -134,7 +134,7 @@ public abstract class AbstractReplicationStrategy
// block for in this context will be localnodes block.
return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
- else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+ else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
}
[3/5] git commit: Wait for gossip to settle before accepting client
connections patch by Chris Burroughs;
reviewed by Tyler Hobbs and jbellis for CASSANDRA-4288
Posted by jb...@apache.org.
Wait for gossip to settle before accepting client connections
patch by Chris Burroughs; reviewed by Tyler Hobbs and jbellis for CASSANDRA-4288
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de19f963
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de19f963
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de19f963
Branch: refs/heads/trunk
Commit: de19f963aeed2752374d2f84c1b230f6cab253f1
Parents: f8fd7db
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 8 20:53:46 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jan 8 20:53:46 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/CassandraDaemon.java | 50 ++++++++++++++++++++
2 files changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de19f963/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0b63a0..e96a8e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.5
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
* Delete unfinished compaction incrementally (CASSANDRA-6086)
* Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
Merged from 1.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de19f963/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index d36b0db..d497a38 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -34,10 +34,14 @@ import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@@ -365,6 +369,8 @@ public class CassandraDaemon
}
}
+ waitForGossipToSettle();
+
// Thift
InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
int rpcPort = DatabaseDescriptor.getRpcPort();
@@ -489,6 +495,50 @@ public class CassandraDaemon
destroy();
}
+
+ private void waitForGossipToSettle()
+ {
+ int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+ if (forceAfter == 0)
+ {
+ return;
+ }
+ final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+ final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+ final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+ logger.info("waiting for gossip to settle before accepting client requests...");
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+ int totalPolls = 0;
+ int numOkay = 0;
+ JMXEnabledThreadPoolExecutor gossipStage = (JMXEnabledThreadPoolExecutor)StageManager.getStage(Stage.GOSSIP);
+ while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ {
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ long completed = gossipStage.getCompletedTasks();
+ long active = gossipStage.getActiveCount();
+ long pending = gossipStage.getPendingTasks();
+ totalPolls++;
+ if (active == 0 && pending == 0)
+ {
+ logger.debug("gossip looks settled. CompletedTasks: {}", completed);
+ numOkay++;
+ }
+ else
+ {
+ logger.info("gossip not settled after {} polls. Gossip Stage active/pending/completed: {}/{}/{}", totalPolls, active, pending, completed);
+ numOkay = 0;
+ }
+ if (forceAfter > 0 && totalPolls > forceAfter)
+ {
+ logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip Stage active/pending/completed: {}/{}/{}",
+ totalPolls, active, pending, completed);
+ break;
+ }
+ }
+ logger.info("gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+ }
+
public static void stop(String[] args)
{
instance.deactivate();
[4/5] git commit: Wait for gossip to settle before accepting client
connections patch by Chris Burroughs;
reviewed by Tyler Hobbs and jbellis for CASSANDRA-4288
Posted by jb...@apache.org.
Wait for gossip to settle before accepting client connections
patch by Chris Burroughs; reviewed by Tyler Hobbs and jbellis for CASSANDRA-4288
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de19f963
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de19f963
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de19f963
Branch: refs/heads/cassandra-2.0
Commit: de19f963aeed2752374d2f84c1b230f6cab253f1
Parents: f8fd7db
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 8 20:53:46 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jan 8 20:53:46 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/CassandraDaemon.java | 50 ++++++++++++++++++++
2 files changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de19f963/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0b63a0..e96a8e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.5
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
* Delete unfinished compaction incrementally (CASSANDRA-6086)
* Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
Merged from 1.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de19f963/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index d36b0db..d497a38 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -34,10 +34,14 @@ import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
@@ -365,6 +369,8 @@ public class CassandraDaemon
}
}
+ waitForGossipToSettle();
+
// Thift
InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
int rpcPort = DatabaseDescriptor.getRpcPort();
@@ -489,6 +495,50 @@ public class CassandraDaemon
destroy();
}
+
+ private void waitForGossipToSettle()
+ {
+ int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+ if (forceAfter == 0)
+ {
+ return;
+ }
+ final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+ final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+ final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+ logger.info("waiting for gossip to settle before accepting client requests...");
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+ int totalPolls = 0;
+ int numOkay = 0;
+ JMXEnabledThreadPoolExecutor gossipStage = (JMXEnabledThreadPoolExecutor)StageManager.getStage(Stage.GOSSIP);
+ while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ {
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ long completed = gossipStage.getCompletedTasks();
+ long active = gossipStage.getActiveCount();
+ long pending = gossipStage.getPendingTasks();
+ totalPolls++;
+ if (active == 0 && pending == 0)
+ {
+ logger.debug("gossip looks settled. CompletedTasks: {}", completed);
+ numOkay++;
+ }
+ else
+ {
+ logger.info("gossip not settled after {} polls. Gossip Stage active/pending/completed: {}/{}/{}", totalPolls, active, pending, completed);
+ numOkay = 0;
+ }
+ if (forceAfter > 0 && totalPolls > forceAfter)
+ {
+ logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip Stage active/pending/completed: {}/{}/{}",
+ totalPolls, active, pending, completed);
+ break;
+ }
+ }
+ logger.info("gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+ }
+
public static void stop(String[] args)
{
instance.deactivate();
[2/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by jb...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/db/ConsistencyLevel.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fd7db6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fd7db6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fd7db6
Branch: refs/heads/trunk
Commit: f8fd7db67e3ff7268bd2bc96bd1373d54664b7dd
Parents: 5fa6055 32d7cb5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 8 17:22:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 8 17:22:55 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ConsistencyLevel.java | 59 +++++++++++++-------
.../locator/AbstractReplicationStrategy.java | 2 +-
3 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index faf52a4,cba97d0..d0b63a0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -5,29 -8,10 +5,30 @@@ Merged from 1.2
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
+ * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
-1.2.13
+2.0.4
+ * Allow removing snapshots of no-longer-existing CFs (CASSANDRA-6418)
+ * add StorageService.stopDaemon() (CASSANDRA-4268)
+ * add IRE for invalid CF supplied to get_count (CASSANDRA-5701)
+ * add client encryption support to sstableloader (CASSANDRA-6378)
+ * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
+ * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
+ * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
+ * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)
+ * Fix cleanup ClassCastException (CASSANDRA-6462)
+ * Reduce gossip memory use by interning VersionedValue strings (CASSANDRA-6410)
+ * Allow specifying datacenters to participate in a repair (CASSANDRA-6218)
+ * Fix divide-by-zero in PCI (CASSANDRA-6403)
+ * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
+ * Add millisecond precision formats to the timestamp parser (CASSANDRA-6395)
+ * Expose a total memtable size metric for a CF (CASSANDRA-6391)
+ * cqlsh: handle symlinks properly (CASSANDRA-6425)
+ * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
+ * Fix assertion error in AbstractQueryPager.discardFirst (CASSANDRA-6447)
+ * Fix streaming older SSTable yields unnecessary tombstones (CASSANDRA-6527)
+Merged from 1.2:
* Improved error message on bad properties in DDL queries (CASSANDRA-6453)
* Randomize batchlog candidates selection (CASSANDRA-6481)
* Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345, 6485)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 0f6aba7,3737c73..6d04314
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@@ -89,12 -88,19 +89,19 @@@ public enum ConsistencyLeve
return codeIdx[code];
}
- private int quorumFor(Table table)
++ private int quorumFor(Keyspace keyspace)
+ {
- return (table.getReplicationStrategy().getReplicationFactor() / 2) + 1;
++ return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ }
+
- private int localQuorumFor(Table table, String dc)
+ private int localQuorumFor(Keyspace keyspace, String dc)
{
- return (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1;
- return (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
- ? (((NetworkTopologyStrategy) table.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
- : quorumFor(table);
++ return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
++ : quorumFor(keyspace);
}
- public int blockFor(Table table)
+ public int blockFor(Keyspace keyspace)
{
switch (this)
{
@@@ -108,17 -114,24 +115,24 @@@
case THREE:
return 3;
case QUORUM:
- return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
- return quorumFor(table);
++ return quorumFor(keyspace);
case ALL:
- return table.getReplicationStrategy().getReplicationFactor();
+ return keyspace.getReplicationStrategy().getReplicationFactor();
case LOCAL_QUORUM:
- return localQuorumFor(table, DatabaseDescriptor.getLocalDataCenter());
+ return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
case EACH_QUORUM:
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
- int n = 0;
- for (String dc : strategy.getDatacenters())
- n += localQuorumFor(keyspace, dc);
- return n;
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+ {
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) table.getReplicationStrategy();
++ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
+ int n = 0;
+ for (String dc : strategy.getDatacenters())
- n += localQuorumFor(table, dc);
++ n += localQuorumFor(keyspace, dc);
+ return n;
+ }
+ else
+ {
- return quorumFor(table);
++ return quorumFor(keyspace);
+ }
default:
throw new UnsupportedOperationException("Invalid consistency level: " + toString());
}
@@@ -211,16 -224,20 +225,20 @@@
case LOCAL_ONE:
return countLocalEndpoints(liveEndpoints) >= 1;
case LOCAL_QUORUM:
- return countLocalEndpoints(liveEndpoints) >= blockFor(table);
+ return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
- return false;
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ {
- if (entry.getValue() < localQuorumFor(table, entry.getKey()))
++ if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
+ return false;
+ }
+ return true;
}
- return true;
+ // Fallthough on purpose for SimpleStrategy
default:
- return Iterables.size(liveEndpoints) >= blockFor(table);
+ return Iterables.size(liveEndpoints) >= blockFor(keyspace);
}
}
@@@ -251,14 -268,18 +269,18 @@@
}
break;
case EACH_QUORUM:
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
- if (table.getReplicationStrategy() instanceof NetworkTopologyStrategy)
++ if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
- int dcLive = entry.getValue();
- if (dcLive < dcBlockFor)
- throw new UnavailableException(this, dcBlockFor, dcLive);
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(table, liveEndpoints).entrySet())
++ for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ {
- int dcBlockFor = localQuorumFor(table, entry.getKey());
++ int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
+ int dcLive = entry.getValue();
+ if (dcLive < dcBlockFor)
+ throw new UnavailableException(this, dcBlockFor, dcLive);
+ }
+ break;
}
- break;
+ // Fallthough on purpose for SimpleStrategy
default:
int live = Iterables.size(liveEndpoints);
if (live < blockFor)
@@@ -281,42 -302,8 +303,39 @@@
}
}
- public void validateForWrite(String table) throws InvalidRequestException
+ public void validateForWrite(String keyspaceName) throws InvalidRequestException
+ {
+ switch (this)
+ {
- case EACH_QUORUM:
- requireNetworkTopologyStrategy(keyspaceName);
- break;
+ case SERIAL:
+ case LOCAL_SERIAL:
+ throw new InvalidRequestException("You must use conditional updates for serializable writes");
+ }
+ }
+
+ // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
+ public void validateForCasCommit(String keyspaceName) throws InvalidRequestException
+ {
+ switch (this)
+ {
+ case EACH_QUORUM:
+ requireNetworkTopologyStrategy(keyspaceName);
+ break;
+ case SERIAL:
+ case LOCAL_SERIAL:
+ throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
+ }
+ }
+
+ public void validateForCas() throws InvalidRequestException
{
+ if (!isSerialConsistency())
+ throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
+ }
+
+ public boolean isSerialConsistency()
+ {
+ return this == SERIAL || this == LOCAL_SERIAL;
}
public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fd7db6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f4a2662,e4dd422..df33813
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -132,20 -132,20 +132,20 @@@ public abstract class AbstractReplicati
if (consistency_level.isDatacenterLocal())
{
// block for in this context will be localnodes block.
- return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
+ else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
- return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getTable(), callback, writeType);
+ return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
}
- private Table getTable()
+ private Keyspace getKeyspace()
{
- if (table == null)
- table = Table.open(tableName);
- return table;
+ if (keyspace == null)
+ keyspace = Keyspace.open(keyspaceName);
+ return keyspace;
}
/**
[5/5] git commit: merge from 2.0
Posted by jb...@apache.org.
merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db07b20e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db07b20e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db07b20e
Branch: refs/heads/trunk
Commit: db07b20edbcd2a23b0669e64e466cd13ce47e2f3
Parents: c2294aa de19f96
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 8 20:54:51 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Jan 8 20:54:51 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/ConsistencyLevel.java | 59 +++++++++++++-------
.../locator/AbstractReplicationStrategy.java | 2 +-
.../cassandra/service/CassandraDaemon.java | 51 ++++++++++++++++-
4 files changed, 92 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db07b20e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 58a0906,e96a8e0..0a8b9b9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,5 +1,32 @@@
+2.1
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+
+
2.0.5
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
* Delete unfinished compaction incrementally (CASSANDRA-6086)
* Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
Merged from 1.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/db07b20e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index ccabad5,d497a38..260dcb2
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -30,8 -32,10 +30,8 @@@ import javax.management.ObjectName
import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
--
import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;