You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:10 UTC
[18/18] cassandra git commit: Transient Replication and Cheap Quorums
Transient Replication and Cheap Quorums
Patch by Blake Eggleston, Benedict Elliott Smith, Marcus Eriksson, Alex Petrov, Ariel Weisberg; Reviewed by Blake Eggleston, Marcus Eriksson, Benedict Elliott Smith, Alex Petrov, Ariel Weisberg for CASSANDRA-14404
Co-authored-by: Blake Eggleston <bd...@gmail.com>
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Co-authored-by: Marcus Eriksson <ma...@apache.org>
Co-authored-by: Alex Petrov <ol...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7431b43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7431b43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7431b43
Branch: refs/heads/trunk
Commit: f7431b432875e334170ccdb19934d05545d2cebd
Parents: 5b645de
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Thu Jul 5 18:10:40 2018 -0400
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Fri Aug 31 21:34:22 2018 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 4 +
conf/cassandra.yaml | 4 +
doc/source/architecture/dynamo.rst | 29 +
doc/source/cql/ddl.rst | 14 +-
...iver-internal-only-3.12.0.post0-5838e2fd.zip | Bin 0 -> 269418 bytes
pylib/cqlshlib/cql3handling.py | 1 +
pylib/cqlshlib/cqlshhandling.py | 1 +
pylib/cqlshlib/test/test_cqlsh_completion.py | 6 +-
pylib/cqlshlib/test/test_cqlsh_output.py | 3 +-
.../cassandra/batchlog/BatchlogManager.java | 45 +-
.../org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 35 +-
.../apache/cassandra/cql3/QueryProcessor.java | 13 +-
.../cql3/statements/BatchStatement.java | 4 +-
.../cql3/statements/BatchUpdatesCollector.java | 2 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../statements/SingleTableUpdatesCollector.java | 2 +-
.../cql3/statements/UpdatesCollector.java | 5 +-
.../schema/AlterKeyspaceStatement.java | 86 +-
.../statements/schema/AlterTableStatement.java | 7 +
.../statements/schema/CreateIndexStatement.java | 5 +
.../statements/schema/CreateTableStatement.java | 9 +
.../statements/schema/CreateViewStatement.java | 5 +
.../cql3/statements/schema/TableAttributes.java | 3 +
.../apache/cassandra/db/ColumnFamilyStore.java | 24 +-
.../apache/cassandra/db/ConsistencyLevel.java | 211 +++--
.../cassandra/db/DiskBoundaryManager.java | 39 +-
src/java/org/apache/cassandra/db/Memtable.java | 1 +
.../cassandra/db/MutationVerbHandler.java | 5 +-
.../cassandra/db/PartitionRangeReadCommand.java | 28 +-
.../org/apache/cassandra/db/ReadCommand.java | 33 +-
.../apache/cassandra/db/SSTableImporter.java | 6 +-
.../db/SinglePartitionReadCommand.java | 26 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 98 ++-
.../cassandra/db/SystemKeyspaceMigrator40.java | 45 +
.../org/apache/cassandra/db/TableCQLHelper.java | 1 +
.../compaction/AbstractCompactionStrategy.java | 3 +-
.../db/compaction/AbstractStrategyHolder.java | 7 +-
.../db/compaction/CompactionManager.java | 295 ++++---
.../db/compaction/CompactionStrategyHolder.java | 34 +-
.../compaction/CompactionStrategyManager.java | 108 +--
.../cassandra/db/compaction/CompactionTask.java | 26 +-
.../db/compaction/PendingRepairHolder.java | 42 +-
.../db/compaction/PendingRepairManager.java | 45 +-
.../cassandra/db/compaction/Scrubber.java | 4 +-
.../cassandra/db/compaction/Upgrader.java | 10 +-
.../cassandra/db/compaction/Verifier.java | 3 +-
.../writers/CompactionAwareWriter.java | 2 +
.../writers/DefaultCompactionWriter.java | 1 +
.../writers/MajorLeveledCompactionWriter.java | 1 +
.../writers/MaxSSTableSizeWriter.java | 1 +
.../SplittingSizeTieredCompactionWriter.java | 1 +
.../db/partitions/PartitionIterators.java | 12 -
.../repair/CassandraKeyspaceRepairManager.java | 10 +-
.../db/repair/PendingAntiCompaction.java | 22 +-
.../db/streaming/CassandraOutgoingFile.java | 11 +-
.../db/streaming/CassandraStreamManager.java | 36 +-
.../db/streaming/CassandraStreamReader.java | 2 +-
.../apache/cassandra/db/view/TableViews.java | 5 +
.../apache/cassandra/db/view/ViewBuilder.java | 19 +-
.../apache/cassandra/db/view/ViewManager.java | 2 +-
.../org/apache/cassandra/db/view/ViewUtils.java | 64 +-
src/java/org/apache/cassandra/dht/Range.java | 27 +-
.../cassandra/dht/RangeFetchMapCalculator.java | 58 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 571 ++++++++----
src/java/org/apache/cassandra/dht/Splitter.java | 95 +-
.../apache/cassandra/dht/StreamStateStore.java | 25 +-
.../ReplicationAwareTokenAllocator.java | 2 +-
.../dht/tokenallocator/TokenAllocation.java | 6 +-
.../exceptions/UnavailableException.java | 20 +-
.../org/apache/cassandra/gms/EndpointState.java | 5 +
.../apache/cassandra/hints/HintsService.java | 21 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 3 +-
.../apache/cassandra/io/sstable/SSTable.java | 13 +
.../cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/io/sstable/SSTableTxnWriter.java | 14 +-
.../io/sstable/SimpleSSTableMultiWriter.java | 3 +-
.../sstable/format/RangeAwareSSTableWriter.java | 8 +-
.../io/sstable/format/SSTableReader.java | 5 +
.../io/sstable/format/SSTableWriter.java | 17 +-
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 17 +-
.../io/sstable/format/big/BigTableWriter.java | 3 +-
.../sstable/metadata/IMetadataSerializer.java | 4 +-
.../io/sstable/metadata/MetadataCollector.java | 8 +-
.../io/sstable/metadata/MetadataSerializer.java | 4 +-
.../io/sstable/metadata/StatsMetadata.java | 52 +-
.../locator/AbstractEndpointSnitch.java | 38 +-
.../locator/AbstractNetworkTopologySnitch.java | 5 +-
.../locator/AbstractReplicaCollection.java | 264 ++++++
.../locator/AbstractReplicationStrategy.java | 142 +--
.../locator/DynamicEndpointSnitch.java | 67 +-
.../org/apache/cassandra/locator/Ec2Snitch.java | 2 +-
.../org/apache/cassandra/locator/Endpoints.java | 157 ++++
.../cassandra/locator/EndpointsByRange.java | 63 ++
.../cassandra/locator/EndpointsByReplica.java | 61 ++
.../cassandra/locator/EndpointsForRange.java | 188 ++++
.../cassandra/locator/EndpointsForToken.java | 172 ++++
.../cassandra/locator/IEndpointSnitch.java | 18 +-
.../cassandra/locator/InetAddressAndPort.java | 5 +-
.../apache/cassandra/locator/LocalStrategy.java | 29 +-
.../locator/NetworkTopologyStrategy.java | 87 +-
.../locator/OldNetworkTopologyStrategy.java | 40 +-
.../cassandra/locator/PendingRangeMaps.java | 161 ++--
.../cassandra/locator/RangesAtEndpoint.java | 313 +++++++
.../cassandra/locator/RangesByEndpoint.java | 54 ++
.../org/apache/cassandra/locator/Replica.java | 196 +++++
.../cassandra/locator/ReplicaCollection.java | 160 ++++
.../apache/cassandra/locator/ReplicaLayout.java | 381 ++++++++
.../cassandra/locator/ReplicaMultimap.java | 127 +++
.../org/apache/cassandra/locator/Replicas.java | 83 ++
.../cassandra/locator/ReplicationFactor.java | 130 +++
.../apache/cassandra/locator/SimpleSnitch.java | 8 +-
.../cassandra/locator/SimpleStrategy.java | 37 +-
.../cassandra/locator/SystemReplicas.java | 62 ++
.../apache/cassandra/locator/TokenMetadata.java | 102 ++-
.../cassandra/metrics/KeyspaceMetrics.java | 43 +-
.../cassandra/metrics/ReadRepairMetrics.java | 1 +
.../apache/cassandra/metrics/TableMetrics.java | 17 +-
.../apache/cassandra/net/IAsyncCallback.java | 11 +-
.../apache/cassandra/net/MessagingService.java | 38 +-
.../apache/cassandra/net/WriteCallbackInfo.java | 15 +-
.../cassandra/repair/AbstractSyncTask.java | 31 +
.../repair/AsymmetricLocalSyncTask.java | 7 +-
.../repair/AsymmetricRemoteSyncTask.java | 6 +
.../cassandra/repair/AsymmetricSyncTask.java | 10 +-
.../apache/cassandra/repair/CommonRange.java | 82 ++
.../cassandra/repair/KeyspaceRepairManager.java | 8 +-
.../apache/cassandra/repair/LocalSyncTask.java | 135 ---
.../apache/cassandra/repair/RemoteSyncTask.java | 74 --
.../org/apache/cassandra/repair/RepairJob.java | 42 +-
.../apache/cassandra/repair/RepairRunnable.java | 87 +-
.../apache/cassandra/repair/RepairSession.java | 57 +-
.../cassandra/repair/StreamingRepairTask.java | 8 +-
.../repair/SymmetricLocalSyncTask.java | 142 +++
.../repair/SymmetricRemoteSyncTask.java | 84 ++
.../cassandra/repair/SymmetricSyncTask.java | 94 ++
.../org/apache/cassandra/repair/SyncTask.java | 97 ---
.../repair/SystemDistributedKeyspace.java | 6 +-
.../repair/consistent/LocalSessions.java | 36 +-
.../apache/cassandra/schema/KeyspaceParams.java | 5 +
.../cassandra/schema/ReplicationParams.java | 9 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 4 +
.../apache/cassandra/schema/TableMetadata.java | 6 +
.../apache/cassandra/schema/TableParams.java | 11 +
.../service/AbstractWriteResponseHandler.java | 95 +-
.../cassandra/service/ActiveRepairService.java | 52 +-
.../service/BatchlogResponseHandler.java | 2 +-
.../DatacenterSyncWriteResponseHandler.java | 21 +-
.../service/DatacenterWriteResponseHandler.java | 26 +-
.../service/PendingRangeCalculatorService.java | 2 +-
.../apache/cassandra/service/StorageProxy.java | 672 ++++++---------
.../cassandra/service/StorageService.java | 860 ++++++++++++-------
.../cassandra/service/StorageServiceMBean.java | 2 +
.../cassandra/service/WriteResponseHandler.java | 25 +-
.../service/reads/AbstractReadExecutor.java | 243 +++---
.../cassandra/service/reads/DataResolver.java | 83 +-
.../cassandra/service/reads/DigestResolver.java | 79 +-
.../cassandra/service/reads/ReadCallback.java | 57 +-
.../service/reads/ResponseResolver.java | 32 +-
.../reads/ShortReadPartitionsProtection.java | 36 +-
.../service/reads/ShortReadProtection.java | 3 +-
.../service/reads/ShortReadRowsProtection.java | 6 +-
.../reads/repair/AbstractReadRepair.java | 90 +-
.../reads/repair/BlockingPartitionRepair.java | 73 +-
.../reads/repair/BlockingReadRepair.java | 29 +-
.../reads/repair/BlockingReadRepairs.java | 19 -
.../service/reads/repair/NoopReadRepair.java | 15 +-
.../repair/PartitionIteratorMergeListener.java | 14 +-
.../reads/repair/ReadOnlyReadRepair.java | 15 +-
.../service/reads/repair/ReadRepair.java | 39 +-
.../reads/repair/ReadRepairDiagnostics.java | 5 +-
.../service/reads/repair/ReadRepairEvent.java | 11 +-
.../reads/repair/ReadRepairStrategy.java | 13 +-
.../reads/repair/RowIteratorMergeListener.java | 58 +-
.../streaming/DefaultConnectionFactory.java | 7 +-
.../apache/cassandra/streaming/StreamPlan.java | 38 +-
.../cassandra/streaming/StreamRequest.java | 98 ++-
.../cassandra/streaming/StreamSession.java | 54 +-
.../cassandra/streaming/TableStreamManager.java | 7 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 2 +
.../tools/SSTableRepairedAtSetter.java | 4 +-
.../cassandra/tools/nodetool/GetReplicas.java | 47 +
.../apache/cassandra/tracing/TraceState.java | 2 +-
.../transport/messages/ErrorMessage.java | 2 +-
src/java/org/apache/cassandra/utils/Pair.java | 12 +
.../cassandra/utils/concurrent/Accumulator.java | 27 +-
.../legacy_na_clust/na-1-big-CompressionInfo.db | Bin 87 -> 87 bytes
.../legacy_na_clust/na-1-big-Data.db | Bin 5259 -> 5214 bytes
.../legacy_na_clust/na-1-big-Digest.crc32 | 2 +-
.../legacy_na_clust/na-1-big-Index.db | Bin 157553 -> 157553 bytes
.../legacy_na_clust/na-1-big-Statistics.db | Bin 7095 -> 7096 bytes
.../legacy_na_clust/na-1-big-TOC.txt | 8 +-
.../na-1-big-CompressionInfo.db | Bin 79 -> 79 bytes
.../legacy_na_clust_counter/na-1-big-Data.db | Bin 5888 -> 5759 bytes
.../na-1-big-Digest.crc32 | 2 +-
.../legacy_na_clust_counter/na-1-big-Index.db | Bin 157553 -> 157553 bytes
.../na-1-big-Statistics.db | Bin 7104 -> 7105 bytes
.../legacy_na_clust_counter/na-1-big-TOC.txt | 8 +-
.../legacy_na_simple/na-1-big-Data.db | Bin 89 -> 88 bytes
.../legacy_na_simple/na-1-big-Digest.crc32 | 2 +-
.../legacy_na_simple/na-1-big-Statistics.db | Bin 4648 -> 4649 bytes
.../legacy_na_simple/na-1-big-TOC.txt | 8 +-
.../legacy_na_simple_counter/na-1-big-Data.db | Bin 140 -> 138 bytes
.../na-1-big-Digest.crc32 | 2 +-
.../na-1-big-Statistics.db | Bin 4657 -> 4658 bytes
.../legacy_na_simple_counter/na-1-big-TOC.txt | 8 +-
.../locator/DynamicEndpointSnitchLongTest.java | 20 +-
.../cassandra/streaming/LongStreamingTest.java | 11 +-
.../test/microbench/PendingRangesBench.java | 27 +-
test/unit/org/apache/cassandra/Util.java | 20 +
.../config/DatabaseDescriptorRefTest.java | 5 +-
.../org/apache/cassandra/cql3/CQLTester.java | 3 +-
.../cql3/validation/operations/CreateTest.java | 3 +-
.../org/apache/cassandra/db/CleanupTest.java | 9 +-
.../cassandra/db/CleanupTransientTest.java | 195 +++++
.../org/apache/cassandra/db/ImportTest.java | 2 +-
.../db/RepairedDataTombstonesTest.java | 2 +-
.../apache/cassandra/db/RowUpdateBuilder.java | 6 +
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../db/SystemKeyspaceMigrator40Test.java | 26 +
.../apache/cassandra/db/TableCQLHelperTest.java | 3 +
.../org/apache/cassandra/db/VerifyTest.java | 4 +-
.../compaction/AbstractPendingRepairTest.java | 13 +-
.../db/compaction/AntiCompactionTest.java | 234 +++--
...pactionStrategyManagerPendingRepairTest.java | 163 +++-
.../CompactionStrategyManagerTest.java | 50 +-
.../db/compaction/CompactionTaskTest.java | 10 +-
.../db/compaction/CompactionsCQLTest.java | 4 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../db/compaction/PendingRepairManagerTest.java | 28 +-
.../db/compaction/SingleSSTableLCSTaskTest.java | 6 +-
.../db/lifecycle/LogTransactionTest.java | 2 +-
.../db/lifecycle/RealTransactionsTest.java | 1 +
...tionManagerGetSSTablesForValidationTest.java | 4 +-
.../db/repair/PendingAntiCompactionTest.java | 39 +-
.../db/streaming/CassandraOutgoingFileTest.java | 1 +
.../streaming/CassandraStreamManagerTest.java | 16 +-
.../db/streaming/StreamRequestTest.java | 98 +++
.../apache/cassandra/db/view/ViewUtilsTest.java | 23 +-
.../apache/cassandra/dht/BootStrapperTest.java | 34 +-
.../dht/RangeFetchMapCalculatorTest.java | 138 ++-
.../org/apache/cassandra/dht/RangeTest.java | 12 +-
.../org/apache/cassandra/dht/SplitterTest.java | 63 +-
.../cassandra/dht/StreamStateStoreTest.java | 5 +-
.../gms/PendingRangeCalculatorServiceTest.java | 2 +-
.../io/sstable/BigTableWriterTest.java | 2 +-
.../io/sstable/CQLSSTableWriterTest.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 9 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 7 +-
.../io/sstable/SSTableRewriterTest.java | 2 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
.../cassandra/io/sstable/SSTableWriterTest.java | 60 ++
.../io/sstable/SSTableWriterTestBase.java | 11 +-
.../format/SSTableFlushObserverTest.java | 2 +-
.../metadata/MetadataSerializerTest.java | 2 +-
.../locator/DynamicEndpointSnitchTest.java | 42 +-
.../locator/NetworkTopologyStrategyTest.java | 92 +-
.../locator/OldNetworkTopologyStrategyTest.java | 28 +-
.../cassandra/locator/PendingRangeMapsTest.java | 49 +-
.../locator/ReplicaCollectionTest.java | 468 ++++++++++
.../apache/cassandra/locator/ReplicaUtils.java | 44 +
.../locator/ReplicationFactorTest.java | 73 ++
.../ReplicationStrategyEndpointCacheTest.java | 56 +-
.../cassandra/locator/SimpleStrategyTest.java | 81 +-
.../cassandra/locator/TokenMetadataTest.java | 8 +-
.../cassandra/net/WriteCallbackInfoTest.java | 7 +-
.../async/OutboundMessagingConnectionTest.java | 3 +-
.../cassandra/repair/LocalSyncTaskTest.java | 191 ----
.../cassandra/repair/RepairRunnableTest.java | 12 +-
.../cassandra/repair/RepairSessionTest.java | 6 +-
.../repair/SymmetricLocalSyncTaskTest.java | 232 +++++
.../repair/SymmetricRemoteSyncTaskTest.java | 71 ++
.../repair/consistent/LocalSessionAccessor.java | 3 +-
.../repair/consistent/LocalSessionTest.java | 9 +-
.../org/apache/cassandra/schema/MockSchema.java | 2 +-
.../service/ActiveRepairServiceTest.java | 50 +-
.../service/BootstrapTransientTest.java | 179 ++++
.../service/LeaveAndBootstrapTest.java | 48 +-
.../org/apache/cassandra/service/MoveTest.java | 324 +++----
.../cassandra/service/MoveTransientTest.java | 638 ++++++++++++++
.../cassandra/service/StorageServiceTest.java | 148 ++++
.../service/WriteResponseHandlerTest.java | 58 +-
.../WriteResponseHandlerTransientTest.java | 224 +++++
.../service/reads/AbstractReadResponseTest.java | 300 +++++++
.../service/reads/DataResolverTest.java | 486 +++++------
.../reads/DataResolverTransientTest.java | 226 +++++
.../service/reads/DigestResolverTest.java | 144 ++++
.../service/reads/ReadExecutorTest.java | 46 +-
.../reads/repair/AbstractReadRepairTest.java | 53 +-
.../reads/repair/BlockingReadRepairTest.java | 113 ++-
.../DiagEventsBlockingReadRepairTest.java | 45 +-
.../reads/repair/InstrumentedReadRepair.java | 4 +-
.../reads/repair/ReadOnlyReadRepairTest.java | 30 +-
.../service/reads/repair/ReadRepairTest.java | 353 ++++++++
.../reads/repair/TestableReadRepair.java | 50 +-
.../streaming/StreamingTransferTest.java | 7 +-
.../utils/concurrent/AccumulatorTest.java | 2 +-
300 files changed, 11945 insertions(+), 4347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e76586..b53b986 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Transient Replication and Cheap Quorums (CASSANDRA-14404)
* Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
* Add diagnostic events for read repairs (CASSANDRA-14668)
* Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index aa8281c..5066378 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404
+ The intended audience for this functionality is expert users of Cassandra who are prepared
+ to validate every aspect of the database for their application and deployment practices. Future
+ releases of Cassandra will make this feature suitable for a wider audience.
- *Experimental* support for Java 11 has been added. JVM options that differ between or are
specific for Java 8 and 11 have been moved from jvm.options into jvm8.options and jvm11.options.
IMPORTANT: Running C* on Java 11 is *experimental* and do it at your own risk.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 064ee4f..503a0fa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1052,6 +1052,10 @@ enable_scripted_user_defined_functions: false
# Materialized views are considered experimental and are not recommended for production use.
enable_materialized_views: true
+# Enables creation of transiently replicated keyspaces on this node.
+# Transient replication is experimental and is not recommended for production use.
+#enable_transient_replication: true
+
# The default Windows kernel timer and scheduling resolution is 15.6ms for power conservation.
# Lowering this value on Windows can provide much tighter latency and better throughput, however
# some virtualized environments may see a negative performance impact from changing this setting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/architecture/dynamo.rst
----------------------------------------------------------------------
diff --git a/doc/source/architecture/dynamo.rst b/doc/source/architecture/dynamo.rst
index 365a695..12c586e 100644
--- a/doc/source/architecture/dynamo.rst
+++ b/doc/source/architecture/dynamo.rst
@@ -74,6 +74,35 @@ nodes in each rack, the data load on the smallest rack may be much higher. Simi
into a new rack, it will be considered a replica for the entire ring. For this reason, many operators choose to
configure all nodes on a single "rack".
+.. _transient-replication:
+
+Transient Replication
+~~~~~~~~~~~~~~~~~~~~~
+
+Transient replication allows you to configure a subset of replicas to only replicate data that hasn't been incrementally
+repaired. This allows you to decouple data redundancy from availability. For instance, if you have a keyspace replicated
+at rf 3, and alter it to rf 5 with 2 transient replicas, you go from being able to tolerate one failed replica to being
+able to tolerate two, without corresponding increase in storage usage. This is because 3 nodes will replicate all the data
+for a given token range, and the other 2 will only replicate data that hasn't been incrementally repaired.
+
+To use transient replication, you first need to enable it in ``cassandra.yaml``. Once enabled, both SimpleStrategy and
+NetworkTopologyStrategy can be configured to transiently replicate data. You configure it by specifying replication factor
+as ``<total_replicas>/<transient_replicas`` Both SimpleStrategy and NetworkTopologyStrategy support configuring transient
+replication.
+
+Transiently replicated keyspaces only support tables created with read_repair set to NONE and monotonic reads are not currently supported.
+You also can't use LWT, logged batches, and counters in 4.0. You will possibly never be able to use materialized views
+with transiently replicated keyspaces and probably never be able to use 2i with them.
+
+Transient replication is an experimental feature that may not be ready for production use. The expected audienced is experienced
+users of Cassandra capable of fully validating a deployment of their particular application. That means being able check
+that operations like reads, writes, decommission, remove, rebuild, repair, and replace all work with your queries, data,
+configuration, operational practices, and availability requirements.
+
+It is anticipated that 4.next will support monotonic reads with transient replication as well as LWT, logged batches, and
+counters.
+
+
Tunable Consistency
^^^^^^^^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/cql/ddl.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/ddl.rst b/doc/source/cql/ddl.rst
index 9afd638..2d9a50a 100644
--- a/doc/source/cql/ddl.rst
+++ b/doc/source/cql/ddl.rst
@@ -105,6 +105,14 @@ strategy is used. By default, Cassandra support the following ``'class'``:
Attempting to create a keyspace that already exists will return an error unless the ``IF NOT EXISTS`` option is used. If
it is used, the statement will be a no-op if the keyspace already exists.
+If :ref:`transient replication <transient-replication>` has been enabled, transient replicas can be configured for both
+SimpleStrategy and NetworkTopologyStrategy by defining replication factors in the format ``'<total_replicas>/<transient_replicas>'``
+
+For instance, this keyspace will have 3 replicas in DC1, 1 of which is transient, and 5 replicas in DC2, 2 of which are transient::
+
+ CREATE KEYSPACE some_keysopace
+ WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : '3/1'', 'DC2' : '5/2'};
+
.. _use-statement:
USE
@@ -455,6 +463,9 @@ A table supports the following options:
| ``speculative_retry`` | *simple* | 99PERCENTILE| :ref:`Speculative retry options |
| | | | <speculative-retry-options>`. |
+--------------------------------+----------+-------------+-----------------------------------------------------------+
+| ``speculative_write_threshold``| *simple* | 99PERCENTILE| :ref:`Speculative retry options |
+| | | | <speculative-retry-options>`. |
++--------------------------------+----------+-------------+-----------------------------------------------------------+
| ``gc_grace_seconds`` | *simple* | 864000 | Time to wait before garbage collecting tombstones |
| | | | (deletion markers). |
+--------------------------------+----------+-------------+-----------------------------------------------------------+
@@ -485,7 +496,8 @@ Speculative retry options
By default, Cassandra read coordinators only query as many replicas as necessary to satisfy
consistency levels: one for consistency level ``ONE``, a quorum for ``QUORUM``, and so on.
``speculative_retry`` determines when coordinators may query additional replicas, which is useful
-when replicas are slow or unresponsive. The following are legal values (case-insensitive):
+when replicas are slow or unresponsive. ``speculative_write_threshold`` specifies the threshold at which
+a cheap quorum write will be upgraded to include transient replicas. The following are legal values (case-insensitive):
============================ ======================== =============================================================================
Format Example Description
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip
new file mode 100644
index 0000000..8d627a9
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 5595e2a..405e88e 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -49,6 +49,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
('max_index_interval', None),
('default_time_to_live', None),
('speculative_retry', None),
+ ('speculative_write_threshold', None),
('memtable_flush_period_in_ms', None),
('cdc', None),
('read_repair', None),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cqlshhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlshhandling.py b/pylib/cqlshlib/cqlshhandling.py
index 9545876..7abd6ce 100644
--- a/pylib/cqlshlib/cqlshhandling.py
+++ b/pylib/cqlshlib/cqlshhandling.py
@@ -112,6 +112,7 @@ cqlsh_consistency_level_syntax_rules = r'''
| "SERIAL"
| "LOCAL_SERIAL"
| "LOCAL_ONE"
+ | "NODE_LOCAL"
;
'''
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index fa9490d..794c591 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -594,7 +594,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
'memtable_flush_period_in_ms',
'CLUSTERING',
'COMPACT', 'caching', 'comment',
- 'min_index_interval', 'speculative_retry', 'cdc'])
+ 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH ',
choices=['bloom_filter_fp_chance', 'compaction',
'compression',
@@ -603,7 +603,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
'memtable_flush_period_in_ms',
'CLUSTERING',
'COMPACT', 'caching', 'comment',
- 'min_index_interval', 'speculative_retry', 'cdc'])
+ 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance ',
immediate='= ')
self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance = ',
@@ -650,7 +650,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
'memtable_flush_period_in_ms',
'CLUSTERING',
'COMPACT', 'caching', 'comment',
- 'min_index_interval', 'speculative_retry', 'cdc'])
+ 'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+ "{'class': 'DateTieredCompactionStrategy', '",
choices=['base_time_seconds', 'max_sstable_age_days',
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_output.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index 2f0d9bf..46546f0 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -622,7 +622,8 @@ class TestCqlshOutput(BaseTestCase):
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
- AND speculative_retry = '99PERCENTILE';
+ AND speculative_retry = '99PERCENTILE'
+ AND speculative_write_threshold = '99PERCENTILE';
""" % quote_name(get_keyspace()))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 4809bd7..8dda54e 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,14 +26,20 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.*;
@@ -419,7 +425,7 @@ public class BatchlogManager implements BatchlogManagerMBean
if (handler != null)
{
hintedNodes.addAll(handler.undelivered);
- HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+ HintsService.instance.write(Collections2.transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
Hint.create(undeliveredMutation, writtenAt));
}
}
@@ -449,35 +455,41 @@ public class BatchlogManager implements BatchlogManagerMBean
long writtenAt,
Set<InetAddressAndPort> hintedNodes)
{
- Set<InetAddressAndPort> liveEndpoints = new HashSet<>();
String ks = mutation.getKeyspaceName();
+ Keyspace keyspace = Keyspace.open(ks);
Token tk = mutation.key().getToken();
- for (InetAddressAndPort endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
+ EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
+ Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+
+ EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk);
+ for (Replica replica : replicas)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+ if (replica.isLocal())
{
mutation.apply();
}
- else if (FailureDetector.instance.isAlive(endpoint))
+ else if (FailureDetector.instance.isAlive(replica.endpoint()))
{
- liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+ liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint.
}
else
{
- hintedNodes.add(endpoint);
- HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+ hintedNodes.add(replica.endpoint());
+ HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
Hint.create(mutation, writtenAt));
}
}
- if (liveEndpoints.isEmpty())
+ EndpointsForToken liveReplicas = liveReplicasBuilder.build();
+ if (liveReplicas.isEmpty())
return null;
- ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime());
+ Replicas.temporaryAssertFull(liveReplicas);
+ ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
MessageOut<Mutation> message = mutation.createMessage();
- for (InetAddressAndPort endpoint : liveEndpoints)
- MessagingService.instance().sendRR(message, endpoint, handler, false);
+ for (Replica replica : liveReplicas)
+ MessagingService.instance().sendWriteRR(message, replica, handler, false);
return handler;
}
@@ -497,16 +509,17 @@ public class BatchlogManager implements BatchlogManagerMBean
{
private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
- ReplayWriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, long queryStartNanoTime)
+ ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime)
{
- super(writeEndpoints, Collections.<InetAddressAndPort>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
- undelivered.addAll(writeEndpoints);
+ super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())),
+ null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+ Iterables.addAll(undelivered, writeReplicas.endpoints());
}
@Override
protected int totalBlockFor()
{
- return this.naturalEndpoints.size();
+ return this.replicaLayout.selected().size();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a13070c..783dcc1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -339,6 +339,8 @@ public class Config
public boolean enable_materialized_views = true;
+ public boolean enable_transient_replication = false;
+
/**
* Optionally disable asynchronous UDF execution.
* Disabling asynchronous UDF execution also implicitly disables the security-manager!
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af13f9c..75b3fc3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.EndpointSnitchInfo;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.net.BackPressureStrategy;
import org.apache.cassandra.net.RateBasedBackPressure;
@@ -122,7 +123,7 @@ public class DatabaseDescriptor
private static long indexSummaryCapacityInMB;
private static String localDC;
- private static Comparator<InetAddressAndPort> localComparator;
+ private static Comparator<Replica> localComparator;
private static EncryptionContext encryptionContext;
private static boolean hasLoggedConfig;
@@ -991,18 +992,14 @@ public class DatabaseDescriptor
EndpointSnitchInfo.create();
localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
- localComparator = new Comparator<InetAddressAndPort>()
- {
- public int compare(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
- {
- boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
- boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
- if (local1 && !local2)
- return -1;
- if (local2 && !local1)
- return 1;
- return 0;
- }
+ localComparator = (replica1, replica2) -> {
+ boolean local1 = localDC.equals(snitch.getDatacenter(replica1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(replica2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
};
}
@@ -2308,7 +2305,7 @@ public class DatabaseDescriptor
return localDC;
}
- public static Comparator<InetAddressAndPort> getLocalComparator()
+ public static Comparator<Replica> getLocalComparator()
{
return localComparator;
}
@@ -2459,6 +2456,16 @@ public class DatabaseDescriptor
return conf.enable_materialized_views;
}
+ public static boolean isTransientReplicationEnabled()
+ {
+ return conf.enable_transient_replication;
+ }
+
+ public static void setTransientReplicationEnabledUnsafe(boolean enabled)
+ {
+ conf.enable_transient_replication = enabled;
+ }
+
public static long getUserDefinedFunctionFailTimeout()
{
return conf.user_defined_function_fail_timeout;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 79e19c1..45db947 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -202,7 +202,18 @@ public class QueryProcessor implements QueryHandler
statement.authorize(clientState);
statement.validate(clientState);
- ResultMessage result = statement.execute(queryState, options, queryStartNanoTime);
+ ResultMessage result;
+ if (options.getConsistency() == ConsistencyLevel.NODE_LOCAL)
+ {
+ assert Boolean.getBoolean("cassandra.enable_nodelocal_queries") : "Node local consistency level is highly dangerous and should be used only for debugging purposes";
+ assert statement instanceof SelectStatement : "Only SELECT statements are permitted for node-local execution";
+ logger.info("Statement {} executed with NODE_LOCAL consistency level.", statement);
+ result = statement.executeLocally(queryState, options);
+ }
+ else
+ {
+ result = statement.execute(queryState, options, queryStartNanoTime);
+ }
return result == null ? new ResultMessage.Void() : result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e925735..fa637ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -261,7 +261,7 @@ public class BatchStatement implements CQLStatement
return statements;
}
- private Collection<? extends IMutation> getMutations(BatchQueryOptions options,
+ private List<? extends IMutation> getMutations(BatchQueryOptions options,
boolean local,
long batchTimestamp,
int nowInSeconds,
@@ -401,7 +401,7 @@ public class BatchStatement implements CQLStatement
return new ResultMessage.Void();
}
- private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
+ private void executeWithoutConditions(List<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
{
if (mutations.isEmpty())
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 96d9f5a..8f70ffc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -104,7 +104,7 @@ final class BatchUpdatesCollector implements UpdatesCollector
* Returns a collection containing all the mutations.
* @return a collection containing all the mutations.
*/
- public Collection<IMutation> toMutations()
+ public List<IMutation> toMutations()
{
//TODO: The case where all statement where on the same keyspace is pretty common, optimize for that?
List<IMutation> ms = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 13fc659..a8367f0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -465,7 +465,7 @@ public abstract class ModificationStatement implements CQLStatement
else
cl.validateForWrite(metadata.keyspace);
- Collection<? extends IMutation> mutations =
+ List<? extends IMutation> mutations =
getMutations(options,
false,
options.getTimestamp(queryState),
@@ -676,7 +676,7 @@ public abstract class ModificationStatement implements CQLStatement
*
* @return list of the mutations
*/
- private Collection<? extends IMutation> getMutations(QueryOptions options,
+ private List<? extends IMutation> getMutations(QueryOptions options,
boolean local,
long timestamp,
int nowInSeconds,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 1def3fd..6ef551d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -82,7 +82,7 @@ final class SingleTableUpdatesCollector implements UpdatesCollector
* Returns a collection containing all the mutations.
* @return a collection containing all the mutations.
*/
- public Collection<IMutation> toMutations()
+ public List<IMutation> toMutations()
{
List<IMutation> ms = new ArrayList<>();
for (PartitionUpdate.Builder builder : puBuilders.values())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index 30db7ca..c3dd334 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -18,17 +18,16 @@
package org.apache.cassandra.cql3.statements;
-import java.util.Collection;
+import java.util.List;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableMetadata;
public interface UpdatesCollector
{
PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
- Collection<IMutation> toMutations();
+ List<IMutation> toMutations();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 12e73d0..2f0c188 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -17,16 +17,27 @@
*/
package org.apache.cassandra.cql3.statements.schema;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.ReplicationFactor;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
import org.apache.cassandra.schema.Keyspaces;
@@ -34,9 +45,13 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.utils.FBUtilities;
public final class AlterKeyspaceStatement extends AlterSchemaStatement
{
+ private static final boolean allow_alter_rf_during_range_movement = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement");
+ private static final boolean allow_unsafe_transient_changes = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_unsafe_transient_changes");
+
private final KeyspaceAttributes attrs;
public AlterKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs)
@@ -60,6 +75,9 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
newKeyspace.params.validate(keyspaceName);
+ validateNoRangeMovements();
+ validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy());
+
return schema.withAddedOrUpdated(newKeyspace);
}
@@ -84,11 +102,77 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy();
AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy();
- return before.getReplicationFactor() < after.getReplicationFactor()
+ return before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas
? ImmutableSet.of("When increasing replication factor you need to run a full (-full) repair to distribute the data.")
: ImmutableSet.of();
}
+ private void validateNoRangeMovements()
+ {
+ if (allow_alter_rf_during_range_movement)
+ return;
+
+ Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream());
+ List<InetAddressAndPort> notNormalEndpoints = endpoints.filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
+ !Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState())
+ .collect(Collectors.toList());
+ if (!notNormalEndpoints.isEmpty())
+ {
+ throw new ConfigurationException("Cannot alter RF while some endpoints are not in normal state (no range movements): " + notNormalEndpoints);
+ }
+ }
+
+ private void validateTransientReplication(AbstractReplicationStrategy oldStrategy, AbstractReplicationStrategy newStrategy)
+ {
+ //If there is no read traffic there are some extra alterations you can safely make, but this is so atypical
+ //that a good default is to not allow unsafe changes
+ if (allow_unsafe_transient_changes)
+ return;
+
+ ReplicationFactor oldRF = oldStrategy.getReplicationFactor();
+ ReplicationFactor newRF = newStrategy.getReplicationFactor();
+
+ int oldTrans = oldRF.transientReplicas();
+ int oldFull = oldRF.fullReplicas;
+ int newTrans = newRF.transientReplicas();
+ int newFull = newRF.fullReplicas;
+
+ if (newTrans > 0)
+ {
+ if (DatabaseDescriptor.getNumTokens() > 1)
+ throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
+
+ Keyspace ks = Keyspace.open(keyspaceName);
+ for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
+ {
+ if (cfs.viewManager.hasViews())
+ {
+ throw new ConfigurationException("Cannot use transient replication on keyspaces using materialized views");
+ }
+
+ if (cfs.indexManager.hasIndexes())
+ {
+ throw new ConfigurationException("Cannot use transient replication on keyspaces using secondary indexes");
+ }
+ }
+ }
+
+ //This is true right now because the transition from transient -> full lacks the pending state
+ //necessary for correctness. What would happen if we allowed this is that we would attempt
+ //to read from a transient replica as if it were a full replica.
+ if (oldFull > newFull && oldTrans > 0)
+ throw new ConfigurationException("Can't add full replicas if there are any transient replicas. You must first remove all transient replicas, then change the # of full replicas, then add back the transient replicas");
+
+ //Don't increase transient replication factor by more than one at a time if changing number of replicas
+ //Just like with changing full replicas it's not safe to do this as you could read from too many replicas
+ //that don't have the necessary data. W/O transient replication this alteration was allowed and it's not clear
+ //if it should be.
+ //This is structured so you can convert as many full replicas to transient replicas as you want.
+ boolean numReplicasChanged = oldTrans + oldFull != newTrans + newFull;
+ if (numReplicasChanged && (newTrans > oldTrans && newTrans != oldTrans + 1))
+ throw new ConfigurationException("Can only safely increase number of transients one at a time with incremental repair run in between each time");
+ }
+
@Override
public AuditLogContext getAuditLogContext()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index 3ec75b2..5044119 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -360,6 +361,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement
"before being replayed.");
}
+ if (keyspace.createReplicationStrategy().hasTransientReplicas()
+ && params.readRepair != ReadRepairStrategy.NONE)
+ {
+ throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
+ }
+
return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index df41358..dbca160 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -28,7 +28,9 @@ import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QualifiedName;
import org.apache.cassandra.cql3.statements.schema.IndexTarget.Type;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
@@ -88,6 +90,9 @@ public final class CreateIndexStatement extends AlterSchemaStatement
if (table.isView())
throw ire("Secondary indexes on materialized views aren't supported");
+ if (Keyspace.open(table.keyspace).getReplicationStrategy().hasTransientReplicas())
+ throw new InvalidRequestException("Secondary indexes are not supported on transiently replicated keyspaces");
+
List<IndexTarget> indexTargets = Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table)));
if (indexTargets.isEmpty() && !attrs.isCustom)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
index 62fcafe..be7907f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
@@ -27,11 +27,14 @@ import org.apache.cassandra.auth.DataResource;
import org.apache.cassandra.auth.IResource;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -98,6 +101,12 @@ public final class CreateTableStatement extends AlterSchemaStatement
TableMetadata table = builder(keyspace.types).build();
table.validate();
+ if (keyspace.createReplicationStrategy().hasTransientReplicas()
+ && table.params.readRepair != ReadRepairStrategy.NONE)
+ {
+ throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
+ }
+
return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table)));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
index 5f62001..bf6bcff 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
@@ -31,9 +31,11 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.RawSelector;
import org.apache.cassandra.cql3.selection.Selectable;
import org.apache.cassandra.cql3.statements.StatementType;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
@@ -107,6 +109,9 @@ public final class CreateViewStatement extends AlterSchemaStatement
if (null == keyspace)
throw ire("Keyspace '%s' doesn't exist", keyspaceName);
+ if (keyspace.createReplicationStrategy().hasTransientReplicas())
+ throw new InvalidRequestException("Materialized views are not supported on transiently replicated keyspaces");
+
TableMetadata table = keyspace.tables.getNullable(tableName);
if (null == table)
throw ire("Base table '%s' doesn't exist", tableName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
index c8e464a..4e66307 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
@@ -128,6 +128,9 @@ public final class TableAttributes extends PropertyDefinitions
if (hasOption(Option.SPECULATIVE_RETRY))
builder.speculativeRetry(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_RETRY)));
+ if (hasOption(Option.SPECULATIVE_WRITE_THRESHOLD))
+ builder.speculativeWriteThreshold(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_WRITE_THRESHOLD)));
+
if (hasOption(Option.CRC_CHECK_CHANCE))
builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5e38584..56851e2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,7 +30,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
import javax.management.*;
import javax.management.openmbean.*;
@@ -68,7 +67,6 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,7 +79,6 @@ import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.repair.TableRepairManager;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.TableStreamManager;
@@ -205,7 +202,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final Directories directories;
public final TableMetrics metric;
- public volatile long sampleLatencyNanos;
+ public volatile long sampleReadLatencyNanos;
+ public volatile long transientWriteLatencyNanos;
private final CassandraTableWriteHandler writeHandler;
private final CassandraStreamManager streamManager;
@@ -384,7 +382,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
viewManager = keyspace.viewManager.forTable(metadata.id);
metric = new TableMetrics(this);
fileIndexGenerator.set(generation);
- sampleLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2);
+ sampleReadLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2);
+ transientWriteLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout() / 2);
logger.info("Initializing {}.{}", keyspace.getName(), name);
@@ -454,7 +453,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
try
{
- sampleLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency);
+ sampleReadLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency);
+ transientWriteLatencyNanos = metadata().params.speculativeWriteThreshold.calculateThreshold(metric.coordinatorWriteLatency);
}
catch (Throwable e)
{
@@ -487,15 +487,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return directories;
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
- return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, txn);
+ return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, collector, header, txn);
}
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
{
- return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, metadataCollector, header, indexManager.listIndexes(), txn);
+ return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadataCollector, header, indexManager.listIndexes(), txn);
}
public boolean supportsEarlyOpen()
@@ -1402,7 +1402,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// cleanup size estimation only counts bytes for keys local to this node
long expectedFileSize = 0;
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges();
for (SSTableReader sstable : sstables)
{
List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(ranges);
@@ -1677,7 +1677,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public void cleanupCache()
{
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+ Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges();
for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
keyIter.hasNext(); )
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index d37da0a..35ba198 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,16 +17,18 @@
*/
package org.apache.cassandra.db;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.Replicas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -47,7 +49,8 @@ public enum ConsistencyLevel
EACH_QUORUM (7),
SERIAL (8),
LOCAL_SERIAL(9),
- LOCAL_ONE (10, true);
+ LOCAL_ONE (10, true),
+ NODE_LOCAL (11, true);
private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);
@@ -89,13 +92,13 @@ public enum ConsistencyLevel
private int quorumFor(Keyspace keyspace)
{
- return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ return (keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1;
}
private int localQuorumFor(Keyspace keyspace, String dc)
{
return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
- ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
+ ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1
: quorumFor(keyspace);
}
@@ -116,7 +119,7 @@ public enum ConsistencyLevel
case SERIAL:
return quorumFor(keyspace);
case ALL:
- return keyspace.getReplicationStrategy().getReplicationFactor();
+ return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
case LOCAL_QUORUM:
case LOCAL_SERIAL:
return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
@@ -138,6 +141,28 @@ public enum ConsistencyLevel
}
}
+ public int blockForWrite(Keyspace keyspace, Endpoints<?> pending)
+ {
+ assert pending != null;
+
+ int blockFor = blockFor(keyspace);
+ switch (this)
+ {
+ case ANY:
+ break;
+ case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL:
+ // we will only count local replicas towards our response count, as these queries only care about local guarantees
+ blockFor += countDCLocalReplicas(pending).allReplicas();
+ break;
+ case ONE: case TWO: case THREE:
+ case QUORUM: case EACH_QUORUM:
+ case SERIAL:
+ case ALL:
+ blockFor += pending.size();
+ }
+ return blockFor;
+ }
+
/**
* Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace
*/
@@ -156,40 +181,75 @@ public enum ConsistencyLevel
return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
}
- public int countLocalEndpoints(Iterable<InetAddressAndPort> liveEndpoints)
+ public static boolean isLocal(Replica replica)
+ {
+ return isLocal(replica.endpoint());
+ }
+
+ private static ReplicaCount countDCLocalReplicas(ReplicaCollection<?> liveReplicas)
{
- int count = 0;
- for (InetAddressAndPort endpoint : liveEndpoints)
- if (isLocal(endpoint))
- count++;
+ ReplicaCount count = new ReplicaCount();
+ for (Replica replica : liveReplicas)
+ if (isLocal(replica))
+ count.increment(replica);
return count;
}
- private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints)
+ private static class ReplicaCount
+ {
+ int fullReplicas;
+ int transientReplicas;
+
+ int allReplicas()
+ {
+ return fullReplicas + transientReplicas;
+ }
+
+ void increment(Replica replica)
+ {
+ if (replica.isFull()) ++fullReplicas;
+ else ++transientReplicas;
+ }
+
+ boolean isSufficient(int allReplicas, int fullReplicas)
+ {
+ return this.fullReplicas >= fullReplicas
+ && this.allReplicas() >= allReplicas;
+ }
+ }
+
+ private static Map<String, ReplicaCount> countPerDCEndpoints(Keyspace keyspace, Iterable<Replica> liveReplicas)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
- Map<String, Integer> dcEndpoints = new HashMap<String, Integer>();
+ Map<String, ReplicaCount> dcEndpoints = new HashMap<>();
for (String dc: strategy.getDatacenters())
- dcEndpoints.put(dc, 0);
+ dcEndpoints.put(dc, new ReplicaCount());
- for (InetAddressAndPort endpoint : liveEndpoints)
+ for (Replica replica : liveReplicas)
{
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
- dcEndpoints.put(dc, dcEndpoints.get(dc) + 1);
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+ dcEndpoints.get(dc).increment(replica);
}
return dcEndpoints;
}
- public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints)
+ public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas)
+ {
+ return filterForQuery(keyspace, liveReplicas, false);
+ }
+
+ public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas, boolean alwaysSpeculate)
{
/*
* If we are doing an each quorum query, we have to make sure that the endpoints we select
* provide a quorum for each data center. If we are not using a NetworkTopologyStrategy,
* we should fall through and grab a quorum in the replication strategy.
+ *
+ * We do not speculate for EACH_QUORUM.
*/
if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
- return filterForEachQuorum(keyspace, liveEndpoints);
+ return filterForEachQuorum(keyspace, liveReplicas);
/*
* Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
@@ -198,36 +258,34 @@ public enum ConsistencyLevel
* the blockFor first ones).
*/
if (isDCLocal)
- liveEndpoints.sort(DatabaseDescriptor.getLocalComparator());
+ liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
- return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace)));
+ return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
}
- private List<InetAddressAndPort> filterForEachQuorum(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints)
+ private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-
- Map<String, List<InetAddressAndPort>> dcsEndpoints = new HashMap<>();
- for (String dc: strategy.getDatacenters())
- dcsEndpoints.put(dc, new ArrayList<>());
-
- for (InetAddressAndPort add : liveEndpoints)
+ Map<String, Integer> dcsReplicas = new HashMap<>();
+ for (String dc : strategy.getDatacenters())
{
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add);
- dcsEndpoints.get(dc).add(add);
+ // we put _up to_ dc replicas only
+ dcsReplicas.put(dc, localQuorumFor(keyspace, dc));
}
- List<InetAddressAndPort> waitSet = new ArrayList<>();
- for (Map.Entry<String, List<InetAddressAndPort>> dcEndpoints : dcsEndpoints.entrySet())
- {
- List<InetAddressAndPort> dcEndpoint = dcEndpoints.getValue();
- waitSet.addAll(dcEndpoint.subList(0, Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size())));
- }
-
- return waitSet;
+ return liveReplicas.filter((replica) -> {
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+ int replicas = dcsReplicas.get(dc);
+ if (replicas > 0)
+ {
+ dcsReplicas.put(dc, --replicas);
+ return true;
+ }
+ return false;
+ });
}
- public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints)
+ public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
{
switch (this)
{
@@ -235,75 +293,92 @@ public enum ConsistencyLevel
// local hint is acceptable, and local node is always live
return true;
case LOCAL_ONE:
- return countLocalEndpoints(liveEndpoints) >= 1;
+ return countDCLocalReplicas(liveReplicas).isSufficient(1, 1);
case LOCAL_QUORUM:
- return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
+ return countDCLocalReplicas(liveReplicas).isSufficient(blockFor(keyspace), 1);
case EACH_QUORUM:
if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ int fullCount = 0;
+ for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, liveReplicas).entrySet())
{
- if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
+ ReplicaCount count = entry.getValue();
+ if (!count.isSufficient(localQuorumFor(keyspace, entry.getKey()), 0))
return false;
+ fullCount += count.fullReplicas;
}
- return true;
+ return fullCount > 0;
}
// Fallthough on purpose for SimpleStrategy
default:
- return Iterables.size(liveEndpoints) >= blockFor(keyspace);
+ return liveReplicas.size() >= blockFor(keyspace)
+ && Replicas.countFull(liveReplicas) > 0;
}
}
- public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) throws UnavailableException
+ public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
+ {
+ assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1);
+ }
+ public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+ {
+ assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
+ }
+ public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
{
- int blockFor = blockFor(keyspace);
switch (this)
{
case ANY:
// local hint is acceptable, and local node is always live
break;
case LOCAL_ONE:
- if (countLocalEndpoints(liveEndpoints) == 0)
- throw new UnavailableException(this, 1, 0);
+ {
+ ReplicaCount localLive = countDCLocalReplicas(allLive);
+ if (!localLive.isSufficient(blockFor, blockForFullReplicas))
+ throw UnavailableException.create(this, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
break;
+ }
case LOCAL_QUORUM:
- int localLive = countLocalEndpoints(liveEndpoints);
- if (localLive < blockFor)
+ {
+ ReplicaCount localLive = countDCLocalReplicas(allLive);
+ if (!localLive.isSufficient(blockFor, blockForFullReplicas))
{
if (logger.isTraceEnabled())
{
- StringBuilder builder = new StringBuilder("Local replicas [");
- for (InetAddressAndPort endpoint : liveEndpoints)
- {
- if (isLocal(endpoint))
- builder.append(endpoint).append(",");
- }
- builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockFor).append(" live nodes in '").append(DatabaseDescriptor.getLocalDataCenter()).append("'");
- logger.trace(builder.toString());
+ logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'",
+ allLive.filter(ConsistencyLevel::isLocal), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
}
- throw new UnavailableException(this, blockFor, localLive);
+ throw UnavailableException.create(this, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
}
break;
+ }
case EACH_QUORUM:
if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
- for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+ int total = 0;
+ int totalFull = 0;
+ for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, allLive).entrySet())
{
int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
- int dcLive = entry.getValue();
- if (dcLive < dcBlockFor)
- throw new UnavailableException(this, entry.getKey(), dcBlockFor, dcLive);
+ ReplicaCount dcCount = entry.getValue();
+ if (!dcCount.isSufficient(dcBlockFor, 0))
+ throw UnavailableException.create(this, entry.getKey(), dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas);
+ totalFull += dcCount.fullReplicas;
+ total += dcCount.allReplicas();
}
+ if (totalFull < blockForFullReplicas)
+ throw UnavailableException.create(this, blockFor, total, blockForFullReplicas, totalFull);
break;
}
// Fallthough on purpose for SimpleStrategy
default:
- int live = Iterables.size(liveEndpoints);
- if (live < blockFor)
+ int live = allLive.size();
+ int full = Replicas.countFull(allLive);
+ if (live < blockFor || full < blockForFullReplicas)
{
if (logger.isTraceEnabled())
- logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor);
- throw new UnavailableException(this, blockFor, live);
+ logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
+ throw UnavailableException.create(this, blockFor, blockForFullReplicas, live, full);
}
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org