You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:10 UTC

[18/18] cassandra git commit: Transient Replication and Cheap Quorums

Transient Replication and Cheap Quorums

Patch by Blake Eggleston, Benedict Elliott Smith, Marcus Eriksson, Alex Petrov, Ariel Weisberg; Reviewed by Blake Eggleston, Marcus Eriksson, Benedict Elliott Smith, Alex Petrov, Ariel Weisberg for CASSANDRA-14404

Co-authored-by: Blake Eggleston <bd...@gmail.com>
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Co-authored-by: Marcus Eriksson <ma...@apache.org>
Co-authored-by: Alex Petrov <ol...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7431b43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7431b43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7431b43

Branch: refs/heads/trunk
Commit: f7431b432875e334170ccdb19934d05545d2cebd
Parents: 5b645de
Author: Ariel Weisberg <ar...@weisberg.ws>
Authored: Thu Jul 5 18:10:40 2018 -0400
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Fri Aug 31 21:34:22 2018 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   4 +
 conf/cassandra.yaml                             |   4 +
 doc/source/architecture/dynamo.rst              |  29 +
 doc/source/cql/ddl.rst                          |  14 +-
 ...iver-internal-only-3.12.0.post0-5838e2fd.zip | Bin 0 -> 269418 bytes
 pylib/cqlshlib/cql3handling.py                  |   1 +
 pylib/cqlshlib/cqlshhandling.py                 |   1 +
 pylib/cqlshlib/test/test_cqlsh_completion.py    |   6 +-
 pylib/cqlshlib/test/test_cqlsh_output.py        |   3 +-
 .../cassandra/batchlog/BatchlogManager.java     |  45 +-
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  35 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  13 +-
 .../cql3/statements/BatchStatement.java         |   4 +-
 .../cql3/statements/BatchUpdatesCollector.java  |   2 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../statements/SingleTableUpdatesCollector.java |   2 +-
 .../cql3/statements/UpdatesCollector.java       |   5 +-
 .../schema/AlterKeyspaceStatement.java          |  86 +-
 .../statements/schema/AlterTableStatement.java  |   7 +
 .../statements/schema/CreateIndexStatement.java |   5 +
 .../statements/schema/CreateTableStatement.java |   9 +
 .../statements/schema/CreateViewStatement.java  |   5 +
 .../cql3/statements/schema/TableAttributes.java |   3 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  24 +-
 .../apache/cassandra/db/ConsistencyLevel.java   | 211 +++--
 .../cassandra/db/DiskBoundaryManager.java       |  39 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   1 +
 .../cassandra/db/MutationVerbHandler.java       |   5 +-
 .../cassandra/db/PartitionRangeReadCommand.java |  28 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  33 +-
 .../apache/cassandra/db/SSTableImporter.java    |   6 +-
 .../db/SinglePartitionReadCommand.java          |  26 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  98 ++-
 .../cassandra/db/SystemKeyspaceMigrator40.java  |  45 +
 .../org/apache/cassandra/db/TableCQLHelper.java |   1 +
 .../compaction/AbstractCompactionStrategy.java  |   3 +-
 .../db/compaction/AbstractStrategyHolder.java   |   7 +-
 .../db/compaction/CompactionManager.java        | 295 ++++---
 .../db/compaction/CompactionStrategyHolder.java |  34 +-
 .../compaction/CompactionStrategyManager.java   | 108 +--
 .../cassandra/db/compaction/CompactionTask.java |  26 +-
 .../db/compaction/PendingRepairHolder.java      |  42 +-
 .../db/compaction/PendingRepairManager.java     |  45 +-
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../cassandra/db/compaction/Upgrader.java       |  10 +-
 .../cassandra/db/compaction/Verifier.java       |   3 +-
 .../writers/CompactionAwareWriter.java          |   2 +
 .../writers/DefaultCompactionWriter.java        |   1 +
 .../writers/MajorLeveledCompactionWriter.java   |   1 +
 .../writers/MaxSSTableSizeWriter.java           |   1 +
 .../SplittingSizeTieredCompactionWriter.java    |   1 +
 .../db/partitions/PartitionIterators.java       |  12 -
 .../repair/CassandraKeyspaceRepairManager.java  |  10 +-
 .../db/repair/PendingAntiCompaction.java        |  22 +-
 .../db/streaming/CassandraOutgoingFile.java     |  11 +-
 .../db/streaming/CassandraStreamManager.java    |  36 +-
 .../db/streaming/CassandraStreamReader.java     |   2 +-
 .../apache/cassandra/db/view/TableViews.java    |   5 +
 .../apache/cassandra/db/view/ViewBuilder.java   |  19 +-
 .../apache/cassandra/db/view/ViewManager.java   |   2 +-
 .../org/apache/cassandra/db/view/ViewUtils.java |  64 +-
 src/java/org/apache/cassandra/dht/Range.java    |  27 +-
 .../cassandra/dht/RangeFetchMapCalculator.java  |  58 +-
 .../org/apache/cassandra/dht/RangeStreamer.java | 571 ++++++++----
 src/java/org/apache/cassandra/dht/Splitter.java |  95 +-
 .../apache/cassandra/dht/StreamStateStore.java  |  25 +-
 .../ReplicationAwareTokenAllocator.java         |   2 +-
 .../dht/tokenallocator/TokenAllocation.java     |   6 +-
 .../exceptions/UnavailableException.java        |  20 +-
 .../org/apache/cassandra/gms/EndpointState.java |   5 +
 .../apache/cassandra/hints/HintsService.java    |  21 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   3 +-
 .../apache/cassandra/io/sstable/SSTable.java    |  13 +
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  14 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    |   3 +-
 .../sstable/format/RangeAwareSSTableWriter.java |   8 +-
 .../io/sstable/format/SSTableReader.java        |   5 +
 .../io/sstable/format/SSTableWriter.java        |  17 +-
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  17 +-
 .../io/sstable/format/big/BigTableWriter.java   |   3 +-
 .../sstable/metadata/IMetadataSerializer.java   |   4 +-
 .../io/sstable/metadata/MetadataCollector.java  |   8 +-
 .../io/sstable/metadata/MetadataSerializer.java |   4 +-
 .../io/sstable/metadata/StatsMetadata.java      |  52 +-
 .../locator/AbstractEndpointSnitch.java         |  38 +-
 .../locator/AbstractNetworkTopologySnitch.java  |   5 +-
 .../locator/AbstractReplicaCollection.java      | 264 ++++++
 .../locator/AbstractReplicationStrategy.java    | 142 +--
 .../locator/DynamicEndpointSnitch.java          |  67 +-
 .../org/apache/cassandra/locator/Ec2Snitch.java |   2 +-
 .../org/apache/cassandra/locator/Endpoints.java | 157 ++++
 .../cassandra/locator/EndpointsByRange.java     |  63 ++
 .../cassandra/locator/EndpointsByReplica.java   |  61 ++
 .../cassandra/locator/EndpointsForRange.java    | 188 ++++
 .../cassandra/locator/EndpointsForToken.java    | 172 ++++
 .../cassandra/locator/IEndpointSnitch.java      |  18 +-
 .../cassandra/locator/InetAddressAndPort.java   |   5 +-
 .../apache/cassandra/locator/LocalStrategy.java |  29 +-
 .../locator/NetworkTopologyStrategy.java        |  87 +-
 .../locator/OldNetworkTopologyStrategy.java     |  40 +-
 .../cassandra/locator/PendingRangeMaps.java     | 161 ++--
 .../cassandra/locator/RangesAtEndpoint.java     | 313 +++++++
 .../cassandra/locator/RangesByEndpoint.java     |  54 ++
 .../org/apache/cassandra/locator/Replica.java   | 196 +++++
 .../cassandra/locator/ReplicaCollection.java    | 160 ++++
 .../apache/cassandra/locator/ReplicaLayout.java | 381 ++++++++
 .../cassandra/locator/ReplicaMultimap.java      | 127 +++
 .../org/apache/cassandra/locator/Replicas.java  |  83 ++
 .../cassandra/locator/ReplicationFactor.java    | 130 +++
 .../apache/cassandra/locator/SimpleSnitch.java  |   8 +-
 .../cassandra/locator/SimpleStrategy.java       |  37 +-
 .../cassandra/locator/SystemReplicas.java       |  62 ++
 .../apache/cassandra/locator/TokenMetadata.java | 102 ++-
 .../cassandra/metrics/KeyspaceMetrics.java      |  43 +-
 .../cassandra/metrics/ReadRepairMetrics.java    |   1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  17 +-
 .../apache/cassandra/net/IAsyncCallback.java    |  11 +-
 .../apache/cassandra/net/MessagingService.java  |  38 +-
 .../apache/cassandra/net/WriteCallbackInfo.java |  15 +-
 .../cassandra/repair/AbstractSyncTask.java      |  31 +
 .../repair/AsymmetricLocalSyncTask.java         |   7 +-
 .../repair/AsymmetricRemoteSyncTask.java        |   6 +
 .../cassandra/repair/AsymmetricSyncTask.java    |  10 +-
 .../apache/cassandra/repair/CommonRange.java    |  82 ++
 .../cassandra/repair/KeyspaceRepairManager.java |   8 +-
 .../apache/cassandra/repair/LocalSyncTask.java  | 135 ---
 .../apache/cassandra/repair/RemoteSyncTask.java |  74 --
 .../org/apache/cassandra/repair/RepairJob.java  |  42 +-
 .../apache/cassandra/repair/RepairRunnable.java |  87 +-
 .../apache/cassandra/repair/RepairSession.java  |  57 +-
 .../cassandra/repair/StreamingRepairTask.java   |   8 +-
 .../repair/SymmetricLocalSyncTask.java          | 142 +++
 .../repair/SymmetricRemoteSyncTask.java         |  84 ++
 .../cassandra/repair/SymmetricSyncTask.java     |  94 ++
 .../org/apache/cassandra/repair/SyncTask.java   |  97 ---
 .../repair/SystemDistributedKeyspace.java       |   6 +-
 .../repair/consistent/LocalSessions.java        |  36 +-
 .../apache/cassandra/schema/KeyspaceParams.java |   5 +
 .../cassandra/schema/ReplicationParams.java     |   9 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   4 +
 .../apache/cassandra/schema/TableMetadata.java  |   6 +
 .../apache/cassandra/schema/TableParams.java    |  11 +
 .../service/AbstractWriteResponseHandler.java   |  95 +-
 .../cassandra/service/ActiveRepairService.java  |  52 +-
 .../service/BatchlogResponseHandler.java        |   2 +-
 .../DatacenterSyncWriteResponseHandler.java     |  21 +-
 .../service/DatacenterWriteResponseHandler.java |  26 +-
 .../service/PendingRangeCalculatorService.java  |   2 +-
 .../apache/cassandra/service/StorageProxy.java  | 672 ++++++---------
 .../cassandra/service/StorageService.java       | 860 ++++++++++++-------
 .../cassandra/service/StorageServiceMBean.java  |   2 +
 .../cassandra/service/WriteResponseHandler.java |  25 +-
 .../service/reads/AbstractReadExecutor.java     | 243 +++---
 .../cassandra/service/reads/DataResolver.java   |  83 +-
 .../cassandra/service/reads/DigestResolver.java |  79 +-
 .../cassandra/service/reads/ReadCallback.java   |  57 +-
 .../service/reads/ResponseResolver.java         |  32 +-
 .../reads/ShortReadPartitionsProtection.java    |  36 +-
 .../service/reads/ShortReadProtection.java      |   3 +-
 .../service/reads/ShortReadRowsProtection.java  |   6 +-
 .../reads/repair/AbstractReadRepair.java        |  90 +-
 .../reads/repair/BlockingPartitionRepair.java   |  73 +-
 .../reads/repair/BlockingReadRepair.java        |  29 +-
 .../reads/repair/BlockingReadRepairs.java       |  19 -
 .../service/reads/repair/NoopReadRepair.java    |  15 +-
 .../repair/PartitionIteratorMergeListener.java  |  14 +-
 .../reads/repair/ReadOnlyReadRepair.java        |  15 +-
 .../service/reads/repair/ReadRepair.java        |  39 +-
 .../reads/repair/ReadRepairDiagnostics.java     |   5 +-
 .../service/reads/repair/ReadRepairEvent.java   |  11 +-
 .../reads/repair/ReadRepairStrategy.java        |  13 +-
 .../reads/repair/RowIteratorMergeListener.java  |  58 +-
 .../streaming/DefaultConnectionFactory.java     |   7 +-
 .../apache/cassandra/streaming/StreamPlan.java  |  38 +-
 .../cassandra/streaming/StreamRequest.java      |  98 ++-
 .../cassandra/streaming/StreamSession.java      |  54 +-
 .../cassandra/streaming/TableStreamManager.java |   7 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   5 +
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +
 .../tools/SSTableRepairedAtSetter.java          |   4 +-
 .../cassandra/tools/nodetool/GetReplicas.java   |  47 +
 .../apache/cassandra/tracing/TraceState.java    |   2 +-
 .../transport/messages/ErrorMessage.java        |   2 +-
 src/java/org/apache/cassandra/utils/Pair.java   |  12 +
 .../cassandra/utils/concurrent/Accumulator.java |  27 +-
 .../legacy_na_clust/na-1-big-CompressionInfo.db | Bin 87 -> 87 bytes
 .../legacy_na_clust/na-1-big-Data.db            | Bin 5259 -> 5214 bytes
 .../legacy_na_clust/na-1-big-Digest.crc32       |   2 +-
 .../legacy_na_clust/na-1-big-Index.db           | Bin 157553 -> 157553 bytes
 .../legacy_na_clust/na-1-big-Statistics.db      | Bin 7095 -> 7096 bytes
 .../legacy_na_clust/na-1-big-TOC.txt            |   8 +-
 .../na-1-big-CompressionInfo.db                 | Bin 79 -> 79 bytes
 .../legacy_na_clust_counter/na-1-big-Data.db    | Bin 5888 -> 5759 bytes
 .../na-1-big-Digest.crc32                       |   2 +-
 .../legacy_na_clust_counter/na-1-big-Index.db   | Bin 157553 -> 157553 bytes
 .../na-1-big-Statistics.db                      | Bin 7104 -> 7105 bytes
 .../legacy_na_clust_counter/na-1-big-TOC.txt    |   8 +-
 .../legacy_na_simple/na-1-big-Data.db           | Bin 89 -> 88 bytes
 .../legacy_na_simple/na-1-big-Digest.crc32      |   2 +-
 .../legacy_na_simple/na-1-big-Statistics.db     | Bin 4648 -> 4649 bytes
 .../legacy_na_simple/na-1-big-TOC.txt           |   8 +-
 .../legacy_na_simple_counter/na-1-big-Data.db   | Bin 140 -> 138 bytes
 .../na-1-big-Digest.crc32                       |   2 +-
 .../na-1-big-Statistics.db                      | Bin 4657 -> 4658 bytes
 .../legacy_na_simple_counter/na-1-big-TOC.txt   |   8 +-
 .../locator/DynamicEndpointSnitchLongTest.java  |  20 +-
 .../cassandra/streaming/LongStreamingTest.java  |  11 +-
 .../test/microbench/PendingRangesBench.java     |  27 +-
 test/unit/org/apache/cassandra/Util.java        |  20 +
 .../config/DatabaseDescriptorRefTest.java       |   5 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |   3 +-
 .../cql3/validation/operations/CreateTest.java  |   3 +-
 .../org/apache/cassandra/db/CleanupTest.java    |   9 +-
 .../cassandra/db/CleanupTransientTest.java      | 195 +++++
 .../org/apache/cassandra/db/ImportTest.java     |   2 +-
 .../db/RepairedDataTombstonesTest.java          |   2 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   6 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../db/SystemKeyspaceMigrator40Test.java        |  26 +
 .../apache/cassandra/db/TableCQLHelperTest.java |   3 +
 .../org/apache/cassandra/db/VerifyTest.java     |   4 +-
 .../compaction/AbstractPendingRepairTest.java   |  13 +-
 .../db/compaction/AntiCompactionTest.java       | 234 +++--
 ...pactionStrategyManagerPendingRepairTest.java | 163 +++-
 .../CompactionStrategyManagerTest.java          |  50 +-
 .../db/compaction/CompactionTaskTest.java       |  10 +-
 .../db/compaction/CompactionsCQLTest.java       |   4 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../db/compaction/PendingRepairManagerTest.java |  28 +-
 .../db/compaction/SingleSSTableLCSTaskTest.java |   6 +-
 .../db/lifecycle/LogTransactionTest.java        |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   1 +
 ...tionManagerGetSSTablesForValidationTest.java |   4 +-
 .../db/repair/PendingAntiCompactionTest.java    |  39 +-
 .../db/streaming/CassandraOutgoingFileTest.java |   1 +
 .../streaming/CassandraStreamManagerTest.java   |  16 +-
 .../db/streaming/StreamRequestTest.java         |  98 +++
 .../apache/cassandra/db/view/ViewUtilsTest.java |  23 +-
 .../apache/cassandra/dht/BootStrapperTest.java  |  34 +-
 .../dht/RangeFetchMapCalculatorTest.java        | 138 ++-
 .../org/apache/cassandra/dht/RangeTest.java     |  12 +-
 .../org/apache/cassandra/dht/SplitterTest.java  |  63 +-
 .../cassandra/dht/StreamStateStoreTest.java     |   5 +-
 .../gms/PendingRangeCalculatorServiceTest.java  |   2 +-
 .../io/sstable/BigTableWriterTest.java          |   2 +-
 .../io/sstable/CQLSSTableWriterTest.java        |   2 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   9 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   7 +-
 .../io/sstable/SSTableRewriterTest.java         |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../cassandra/io/sstable/SSTableWriterTest.java |  60 ++
 .../io/sstable/SSTableWriterTestBase.java       |  11 +-
 .../format/SSTableFlushObserverTest.java        |   2 +-
 .../metadata/MetadataSerializerTest.java        |   2 +-
 .../locator/DynamicEndpointSnitchTest.java      |  42 +-
 .../locator/NetworkTopologyStrategyTest.java    |  92 +-
 .../locator/OldNetworkTopologyStrategyTest.java |  28 +-
 .../cassandra/locator/PendingRangeMapsTest.java |  49 +-
 .../locator/ReplicaCollectionTest.java          | 468 ++++++++++
 .../apache/cassandra/locator/ReplicaUtils.java  |  44 +
 .../locator/ReplicationFactorTest.java          |  73 ++
 .../ReplicationStrategyEndpointCacheTest.java   |  56 +-
 .../cassandra/locator/SimpleStrategyTest.java   |  81 +-
 .../cassandra/locator/TokenMetadataTest.java    |   8 +-
 .../cassandra/net/WriteCallbackInfoTest.java    |   7 +-
 .../async/OutboundMessagingConnectionTest.java  |   3 +-
 .../cassandra/repair/LocalSyncTaskTest.java     | 191 ----
 .../cassandra/repair/RepairRunnableTest.java    |  12 +-
 .../cassandra/repair/RepairSessionTest.java     |   6 +-
 .../repair/SymmetricLocalSyncTaskTest.java      | 232 +++++
 .../repair/SymmetricRemoteSyncTaskTest.java     |  71 ++
 .../repair/consistent/LocalSessionAccessor.java |   3 +-
 .../repair/consistent/LocalSessionTest.java     |   9 +-
 .../org/apache/cassandra/schema/MockSchema.java |   2 +-
 .../service/ActiveRepairServiceTest.java        |  50 +-
 .../service/BootstrapTransientTest.java         | 179 ++++
 .../service/LeaveAndBootstrapTest.java          |  48 +-
 .../org/apache/cassandra/service/MoveTest.java  | 324 +++----
 .../cassandra/service/MoveTransientTest.java    | 638 ++++++++++++++
 .../cassandra/service/StorageServiceTest.java   | 148 ++++
 .../service/WriteResponseHandlerTest.java       |  58 +-
 .../WriteResponseHandlerTransientTest.java      | 224 +++++
 .../service/reads/AbstractReadResponseTest.java | 300 +++++++
 .../service/reads/DataResolverTest.java         | 486 +++++------
 .../reads/DataResolverTransientTest.java        | 226 +++++
 .../service/reads/DigestResolverTest.java       | 144 ++++
 .../service/reads/ReadExecutorTest.java         |  46 +-
 .../reads/repair/AbstractReadRepairTest.java    |  53 +-
 .../reads/repair/BlockingReadRepairTest.java    | 113 ++-
 .../DiagEventsBlockingReadRepairTest.java       |  45 +-
 .../reads/repair/InstrumentedReadRepair.java    |   4 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |  30 +-
 .../service/reads/repair/ReadRepairTest.java    | 353 ++++++++
 .../reads/repair/TestableReadRepair.java        |  50 +-
 .../streaming/StreamingTransferTest.java        |   7 +-
 .../utils/concurrent/AccumulatorTest.java       |   2 +-
 300 files changed, 11945 insertions(+), 4347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e76586..b53b986 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Transient Replication and Cheap Quorums (CASSANDRA-14404)
  * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
  * Add diagnostic events for read repairs (CASSANDRA-14668)
  * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index aa8281c..5066378 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,10 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404
+     The intended audience for this functionality is expert users of Cassandra who are prepared
+     to validate every aspect of the database for their application and deployment practices. Future
+     releases of Cassandra will make this feature suitable for a wider audience.
    - *Experimental* support for Java 11 has been added. JVM options that differ between or are
      specific for Java 8 and 11 have been moved from jvm.options into jvm8.options and jvm11.options.
      IMPORTANT: Running C* on Java 11 is *experimental* and do it at your own risk.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 064ee4f..503a0fa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1052,6 +1052,10 @@ enable_scripted_user_defined_functions: false
 # Materialized views are considered experimental and are not recommended for production use.
 enable_materialized_views: true
 
+# Enables creation of transiently replicated keyspaces on this node.
+# Transient replication is experimental and is not recommended for production use.
+#enable_transient_replication: true
+
 # The default Windows kernel timer and scheduling resolution is 15.6ms for power conservation.
 # Lowering this value on Windows can provide much tighter latency and better throughput, however
 # some virtualized environments may see a negative performance impact from changing this setting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/architecture/dynamo.rst
----------------------------------------------------------------------
diff --git a/doc/source/architecture/dynamo.rst b/doc/source/architecture/dynamo.rst
index 365a695..12c586e 100644
--- a/doc/source/architecture/dynamo.rst
+++ b/doc/source/architecture/dynamo.rst
@@ -74,6 +74,35 @@ nodes in each rack, the data load on the smallest rack may be much higher.  Simi
 into a new rack, it will be considered a replica for the entire ring.  For this reason, many operators choose to
 configure all nodes on a single "rack".
 
+.. _transient-replication:
+
+Transient Replication
+~~~~~~~~~~~~~~~~~~~~~
+
+Transient replication allows you to configure a subset of replicas to only replicate data that hasn't been incrementally
+repaired. This allows you to decouple data redundancy from availability. For instance, if you have a keyspace replicated
+at rf 3, and alter it to rf 5 with 2 transient replicas, you go from being able to tolerate one failed replica to being
+able to tolerate two, without corresponding increase in storage usage. This is because 3 nodes will replicate all the data
+for a given token range, and the other 2 will only replicate data that hasn't been incrementally repaired.
+
+To use transient replication, you first need to enable it in ``cassandra.yaml``. Once enabled, both SimpleStrategy and
+NetworkTopologyStrategy can be configured to transiently replicate data. You configure it by specifying replication factor
+as ``<total_replicas>/<transient_replicas`` Both SimpleStrategy and NetworkTopologyStrategy support configuring transient
+replication.
+
+Transiently replicated keyspaces only support tables created with read_repair set to NONE and monotonic reads are not currently supported.
+You also can't use LWT, logged batches, and counters in 4.0. You will possibly never be able to use materialized views
+with transiently replicated keyspaces and probably never be able to use 2i with them.
+
+Transient replication is an experimental feature that may not be ready for production use. The expected audienced is experienced
+users of Cassandra capable of fully validating a deployment of their particular application. That means being able check
+that operations like reads, writes, decommission, remove, rebuild, repair, and replace all work with your queries, data,
+configuration, operational practices, and availability requirements.
+
+It is anticipated that 4.next will support monotonic reads with transient replication as well as LWT, logged batches, and
+counters.
+
+
 Tunable Consistency
 ^^^^^^^^^^^^^^^^^^^
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/doc/source/cql/ddl.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/ddl.rst b/doc/source/cql/ddl.rst
index 9afd638..2d9a50a 100644
--- a/doc/source/cql/ddl.rst
+++ b/doc/source/cql/ddl.rst
@@ -105,6 +105,14 @@ strategy is used. By default, Cassandra support the following ``'class'``:
 Attempting to create a keyspace that already exists will return an error unless the ``IF NOT EXISTS`` option is used. If
 it is used, the statement will be a no-op if the keyspace already exists.
 
+If :ref:`transient replication <transient-replication>` has been enabled, transient replicas can be configured for both
+SimpleStrategy and NetworkTopologyStrategy by defining replication factors in the format ``'<total_replicas>/<transient_replicas>'``
+
+For instance, this keyspace will have 3 replicas in DC1, 1 of which is transient, and 5 replicas in DC2, 2 of which are transient::
+
+    CREATE KEYSPACE some_keysopace
+               WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : '3/1'', 'DC2' : '5/2'};
+
 .. _use-statement:
 
 USE
@@ -455,6 +463,9 @@ A table supports the following options:
 | ``speculative_retry``          | *simple* | 99PERCENTILE| :ref:`Speculative retry options                           |
 |                                |          |             | <speculative-retry-options>`.                             |
 +--------------------------------+----------+-------------+-----------------------------------------------------------+
+| ``speculative_write_threshold``| *simple* | 99PERCENTILE| :ref:`Speculative retry options                           |
+|                                |          |             | <speculative-retry-options>`.                             |
++--------------------------------+----------+-------------+-----------------------------------------------------------+
 | ``gc_grace_seconds``           | *simple* | 864000      | Time to wait before garbage collecting tombstones         |
 |                                |          |             | (deletion markers).                                       |
 +--------------------------------+----------+-------------+-----------------------------------------------------------+
@@ -485,7 +496,8 @@ Speculative retry options
 By default, Cassandra read coordinators only query as many replicas as necessary to satisfy
 consistency levels: one for consistency level ``ONE``, a quorum for ``QUORUM``, and so on.
 ``speculative_retry`` determines when coordinators may query additional replicas, which is useful
-when replicas are slow or unresponsive.  The following are legal values (case-insensitive):
+when replicas are slow or unresponsive.  ``speculative_write_threshold`` specifies the threshold at which
+a cheap quorum write will be upgraded to include transient replicas.  The following are legal values (case-insensitive):
 
 ============================ ======================== =============================================================================
  Format                       Example                  Description

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip
new file mode 100644
index 0000000..8d627a9
Binary files /dev/null and b/lib/cassandra-driver-internal-only-3.12.0.post0-5838e2fd.zip differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 5595e2a..405e88e 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -49,6 +49,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
         ('max_index_interval', None),
         ('default_time_to_live', None),
         ('speculative_retry', None),
+        ('speculative_write_threshold', None),
         ('memtable_flush_period_in_ms', None),
         ('cdc', None),
         ('read_repair', None),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/cqlshhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlshhandling.py b/pylib/cqlshlib/cqlshhandling.py
index 9545876..7abd6ce 100644
--- a/pylib/cqlshlib/cqlshhandling.py
+++ b/pylib/cqlshlib/cqlshhandling.py
@@ -112,6 +112,7 @@ cqlsh_consistency_level_syntax_rules = r'''
                      | "SERIAL"
                      | "LOCAL_SERIAL"
                      | "LOCAL_ONE"
+                     | "NODE_LOCAL"
                      ;
 '''
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index fa9490d..794c591 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -594,7 +594,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'memtable_flush_period_in_ms',
                                      'CLUSTERING',
                                      'COMPACT', 'caching', 'comment',
-                                     'min_index_interval', 'speculative_retry', 'cdc'])
+                                     'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
         self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH ',
                             choices=['bloom_filter_fp_chance', 'compaction',
                                      'compression',
@@ -603,7 +603,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'memtable_flush_period_in_ms',
                                      'CLUSTERING',
                                      'COMPACT', 'caching', 'comment',
-                                     'min_index_interval', 'speculative_retry', 'cdc'])
+                                     'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
         self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance ',
                             immediate='= ')
         self.trycompletions(prefix + ' new_table (col_a int PRIMARY KEY) WITH bloom_filter_fp_chance = ',
@@ -650,7 +650,7 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'memtable_flush_period_in_ms',
                                      'CLUSTERING',
                                      'COMPACT', 'caching', 'comment',
-                                     'min_index_interval', 'speculative_retry', 'cdc'])
+                                     'min_index_interval', 'speculative_retry', 'speculative_write_threshold', 'cdc'])
         self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
                             + "{'class': 'DateTieredCompactionStrategy', '",
                             choices=['base_time_seconds', 'max_sstable_age_days',

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/pylib/cqlshlib/test/test_cqlsh_output.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py
index 2f0d9bf..46546f0 100644
--- a/pylib/cqlshlib/test/test_cqlsh_output.py
+++ b/pylib/cqlshlib/test/test_cqlsh_output.py
@@ -622,7 +622,8 @@ class TestCqlshOutput(BaseTestCase):
                 AND max_index_interval = 2048
                 AND memtable_flush_period_in_ms = 0
                 AND min_index_interval = 128
-                AND speculative_retry = '99PERCENTILE';
+                AND speculative_retry = '99PERCENTILE'
+                AND speculative_write_threshold = '99PERCENTILE';
 
         """ % quote_name(get_keyspace()))
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 4809bd7..8dda54e 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,14 +26,20 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.*;
@@ -419,7 +425,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (handler != null)
                 {
                     hintedNodes.addAll(handler.undelivered);
-                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+                    HintsService.instance.write(Collections2.transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
                                                 Hint.create(undeliveredMutation, writtenAt));
                 }
             }
@@ -449,35 +455,41 @@ public class BatchlogManager implements BatchlogManagerMBean
                                                                                      long writtenAt,
                                                                                      Set<InetAddressAndPort> hintedNodes)
         {
-            Set<InetAddressAndPort> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
+            Keyspace keyspace = Keyspace.open(ks);
             Token tk = mutation.key().getToken();
 
-            for (InetAddressAndPort endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
+            EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
+            Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+
+            EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk);
+            for (Replica replica : replicas)
             {
-                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+                if (replica.isLocal())
                 {
                     mutation.apply();
                 }
-                else if (FailureDetector.instance.isAlive(endpoint))
+                else if (FailureDetector.instance.isAlive(replica.endpoint()))
                 {
-                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                    liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint.
                 }
                 else
                 {
-                    hintedNodes.add(endpoint);
-                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+                    hintedNodes.add(replica.endpoint());
+                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
                                                 Hint.create(mutation, writtenAt));
                 }
             }
 
-            if (liveEndpoints.isEmpty())
+            EndpointsForToken liveReplicas = liveReplicasBuilder.build();
+            if (liveReplicas.isEmpty())
                 return null;
 
-            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime());
+            Replicas.temporaryAssertFull(liveReplicas);
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
-            for (InetAddressAndPort endpoint : liveEndpoints)
-                MessagingService.instance().sendRR(message, endpoint, handler, false);
+            for (Replica replica : liveReplicas)
+                MessagingService.instance().sendWriteRR(message, replica, handler, false);
             return handler;
         }
 
@@ -497,16 +509,17 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints, long queryStartNanoTime)
+            ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime)
             {
-                super(writeEndpoints, Collections.<InetAddressAndPort>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
-                undelivered.addAll(writeEndpoints);
+                super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())),
+                      null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+                Iterables.addAll(undelivered, writeReplicas.endpoints());
             }
 
             @Override
             protected int totalBlockFor()
             {
-                return this.naturalEndpoints.size();
+                return this.replicaLayout.selected().size();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a13070c..783dcc1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -339,6 +339,8 @@ public class Config
 
     public boolean enable_materialized_views = true;
 
+    public boolean enable_transient_replication = false;
+
     /**
      * Optionally disable asynchronous UDF execution.
      * Disabling asynchronous UDF execution also implicitly disables the security-manager!

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af13f9c..75b3fc3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.EndpointSnitchInfo;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.SeedProvider;
 import org.apache.cassandra.net.BackPressureStrategy;
 import org.apache.cassandra.net.RateBasedBackPressure;
@@ -122,7 +123,7 @@ public class DatabaseDescriptor
     private static long indexSummaryCapacityInMB;
 
     private static String localDC;
-    private static Comparator<InetAddressAndPort> localComparator;
+    private static Comparator<Replica> localComparator;
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
@@ -991,18 +992,14 @@ public class DatabaseDescriptor
         EndpointSnitchInfo.create();
 
         localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-        localComparator = new Comparator<InetAddressAndPort>()
-        {
-            public int compare(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
-            {
-                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
-                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
-                if (local1 && !local2)
-                    return -1;
-                if (local2 && !local1)
-                    return 1;
-                return 0;
-            }
+        localComparator = (replica1, replica2) -> {
+            boolean local1 = localDC.equals(snitch.getDatacenter(replica1));
+            boolean local2 = localDC.equals(snitch.getDatacenter(replica2));
+            if (local1 && !local2)
+                return -1;
+            if (local2 && !local1)
+                return 1;
+            return 0;
         };
     }
 
@@ -2308,7 +2305,7 @@ public class DatabaseDescriptor
         return localDC;
     }
 
-    public static Comparator<InetAddressAndPort> getLocalComparator()
+    public static Comparator<Replica> getLocalComparator()
     {
         return localComparator;
     }
@@ -2459,6 +2456,16 @@ public class DatabaseDescriptor
         return conf.enable_materialized_views;
     }
 
+    public static boolean isTransientReplicationEnabled()
+    {
+        return conf.enable_transient_replication;
+    }
+
+    public static void setTransientReplicationEnabledUnsafe(boolean enabled)
+    {
+        conf.enable_transient_replication = enabled;
+    }
+
     public static long getUserDefinedFunctionFailTimeout()
     {
         return conf.user_defined_function_fail_timeout;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 79e19c1..45db947 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -202,7 +202,18 @@ public class QueryProcessor implements QueryHandler
         statement.authorize(clientState);
         statement.validate(clientState);
 
-        ResultMessage result = statement.execute(queryState, options, queryStartNanoTime);
+        ResultMessage result;
+        if (options.getConsistency() == ConsistencyLevel.NODE_LOCAL)
+        {
+            assert Boolean.getBoolean("cassandra.enable_nodelocal_queries") : "Node local consistency level is highly dangerous and should be used only for debugging purposes";
+            assert statement instanceof SelectStatement : "Only SELECT statements are permitted for node-local execution";
+            logger.info("Statement {} executed with NODE_LOCAL consistency level.", statement);
+            result = statement.executeLocally(queryState, options);
+        }
+        else
+        {
+            result = statement.execute(queryState, options, queryStartNanoTime);
+        }
         return result == null ? new ResultMessage.Void() : result;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e925735..fa637ef 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -261,7 +261,7 @@ public class BatchStatement implements CQLStatement
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions options,
+    private List<? extends IMutation> getMutations(BatchQueryOptions options,
                                                          boolean local,
                                                          long batchTimestamp,
                                                          int nowInSeconds,
@@ -401,7 +401,7 @@ public class BatchStatement implements CQLStatement
         return new ResultMessage.Void();
     }
 
-    private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
+    private void executeWithoutConditions(List<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
         if (mutations.isEmpty())
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 96d9f5a..8f70ffc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -104,7 +104,7 @@ final class BatchUpdatesCollector implements UpdatesCollector
      * Returns a collection containing all the mutations.
      * @return a collection containing all the mutations.
      */
-    public Collection<IMutation> toMutations()
+    public List<IMutation> toMutations()
     {
         //TODO: The case where all statement where on the same keyspace is pretty common, optimize for that?
         List<IMutation> ms = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 13fc659..a8367f0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -465,7 +465,7 @@ public abstract class ModificationStatement implements CQLStatement
         else
             cl.validateForWrite(metadata.keyspace);
 
-        Collection<? extends IMutation> mutations =
+        List<? extends IMutation> mutations =
             getMutations(options,
                          false,
                          options.getTimestamp(queryState),
@@ -676,7 +676,7 @@ public abstract class ModificationStatement implements CQLStatement
      *
      * @return list of the mutations
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options,
+    private List<? extends IMutation> getMutations(QueryOptions options,
                                                          boolean local,
                                                          long timestamp,
                                                          int nowInSeconds,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 1def3fd..6ef551d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -82,7 +82,7 @@ final class SingleTableUpdatesCollector implements UpdatesCollector
      * Returns a collection containing all the mutations.
      * @return a collection containing all the mutations.
      */
-    public Collection<IMutation> toMutations()
+    public List<IMutation> toMutations()
     {
         List<IMutation> ms = new ArrayList<>();
         for (PartitionUpdate.Builder builder : puBuilders.values())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
index 30db7ca..c3dd334 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -18,17 +18,16 @@
 
 package org.apache.cassandra.cql3.statements;
 
-import java.util.Collection;
+import java.util.List;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableMetadata;
 
 public interface UpdatesCollector
 {
     PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
-    Collection<IMutation> toMutations();
+    List<IMutation> toMutations();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 12e73d0..2f0c188 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -17,16 +17,27 @@
  */
 package org.apache.cassandra.cql3.statements.schema;
 
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.ReplicationFactor;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 import org.apache.cassandra.schema.Keyspaces;
@@ -34,9 +45,13 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
+import org.apache.cassandra.utils.FBUtilities;
 
 public final class AlterKeyspaceStatement extends AlterSchemaStatement
 {
+    private static final boolean allow_alter_rf_during_range_movement = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement");
+    private static final boolean allow_unsafe_transient_changes = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_unsafe_transient_changes");
+
     private final KeyspaceAttributes attrs;
 
     public AlterKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs)
@@ -60,6 +75,9 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
 
         newKeyspace.params.validate(keyspaceName);
 
+        validateNoRangeMovements();
+        validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy());
+
         return schema.withAddedOrUpdated(newKeyspace);
     }
 
@@ -84,11 +102,77 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
         AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy();
         AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy();
 
-        return before.getReplicationFactor() < after.getReplicationFactor()
+        return before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas
              ? ImmutableSet.of("When increasing replication factor you need to run a full (-full) repair to distribute the data.")
              : ImmutableSet.of();
     }
 
+    private void validateNoRangeMovements()
+    {
+        if (allow_alter_rf_during_range_movement)
+            return;
+
+        Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream());
+        List<InetAddressAndPort> notNormalEndpoints = endpoints.filter(endpoint -> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
+                                                                                   !Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState())
+                                                               .collect(Collectors.toList());
+        if (!notNormalEndpoints.isEmpty())
+        {
+            throw new ConfigurationException("Cannot alter RF while some endpoints are not in normal state (no range movements): " + notNormalEndpoints);
+        }
+    }
+
+    private void validateTransientReplication(AbstractReplicationStrategy oldStrategy, AbstractReplicationStrategy newStrategy)
+    {
+        //If there is no read traffic there are some extra alterations you can safely make, but this is so atypical
+        //that a good default is to not allow unsafe changes
+        if (allow_unsafe_transient_changes)
+            return;
+
+        ReplicationFactor oldRF = oldStrategy.getReplicationFactor();
+        ReplicationFactor newRF = newStrategy.getReplicationFactor();
+
+        int oldTrans = oldRF.transientReplicas();
+        int oldFull = oldRF.fullReplicas;
+        int newTrans = newRF.transientReplicas();
+        int newFull = newRF.fullReplicas;
+
+        if (newTrans > 0)
+        {
+            if (DatabaseDescriptor.getNumTokens() > 1)
+                throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
+
+            Keyspace ks = Keyspace.open(keyspaceName);
+            for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
+            {
+                if (cfs.viewManager.hasViews())
+                {
+                    throw new ConfigurationException("Cannot use transient replication on keyspaces using materialized views");
+                }
+
+                if (cfs.indexManager.hasIndexes())
+                {
+                    throw new ConfigurationException("Cannot use transient replication on keyspaces using secondary indexes");
+                }
+            }
+        }
+
+        //This is true right now because the transition from transient -> full lacks the pending state
+        //necessary for correctness. What would happen if we allowed this is that we would attempt
+        //to read from a transient replica as if it were a full replica.
+        if (oldFull > newFull && oldTrans > 0)
+            throw new ConfigurationException("Can't add full replicas if there are any transient replicas. You must first remove all transient replicas, then change the # of full replicas, then add back the transient replicas");
+
+        //Don't increase transient replication factor by more than one at a time if changing number of replicas
+        //Just like with changing full replicas it's not safe to do this as you could read from too many replicas
+        //that don't have the necessary data. W/O transient replication this alteration was allowed and it's not clear
+        //if it should be.
+        //This is structured so you can convert as many full replicas to transient replicas as you want.
+        boolean numReplicasChanged = oldTrans + oldFull != newTrans + newFull;
+        if (numReplicasChanged && (newTrans > oldTrans && newTrans != oldTrans + 1))
+            throw new ConfigurationException("Can only safely increase number of transients one at a time with incremental repair run in between each time");
+    }
+
     @Override
     public AuditLogContext getAuditLogContext()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index 3ec75b2..5044119 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -360,6 +361,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement
                           "before being replayed.");
             }
 
+            if (keyspace.createReplicationStrategy().hasTransientReplicas()
+                && params.readRepair != ReadRepairStrategy.NONE)
+            {
+                throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
+            }
+
             return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index df41358..dbca160 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -28,7 +28,9 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QualifiedName;
 import org.apache.cassandra.cql3.statements.schema.IndexTarget.Type;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
@@ -88,6 +90,9 @@ public final class CreateIndexStatement extends AlterSchemaStatement
         if (table.isView())
             throw ire("Secondary indexes on materialized views aren't supported");
 
+        if (Keyspace.open(table.keyspace).getReplicationStrategy().hasTransientReplicas())
+            throw new InvalidRequestException("Secondary indexes are not supported on transiently replicated keyspaces");
+
         List<IndexTarget> indexTargets = Lists.newArrayList(transform(rawIndexTargets, t -> t.prepare(table)));
 
         if (indexTargets.isEmpty() && !attrs.isCustom)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
index 62fcafe..be7907f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
@@ -27,11 +27,14 @@ import org.apache.cassandra.auth.DataResource;
 import org.apache.cassandra.auth.IResource;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -98,6 +101,12 @@ public final class CreateTableStatement extends AlterSchemaStatement
         TableMetadata table = builder(keyspace.types).build();
         table.validate();
 
+        if (keyspace.createReplicationStrategy().hasTransientReplicas()
+            && table.params.readRepair != ReadRepairStrategy.NONE)
+        {
+            throw ire("read_repair must be set to 'NONE' for transiently replicated keyspaces");
+        }
+
         return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table)));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
index 5f62001..bf6bcff 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
@@ -31,9 +31,11 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.selection.Selectable;
 import org.apache.cassandra.cql3.statements.StatementType;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
@@ -107,6 +109,9 @@ public final class CreateViewStatement extends AlterSchemaStatement
         if (null == keyspace)
             throw ire("Keyspace '%s' doesn't exist", keyspaceName);
 
+        if (keyspace.createReplicationStrategy().hasTransientReplicas())
+            throw new InvalidRequestException("Materialized views are not supported on transiently replicated keyspaces");
+
         TableMetadata table = keyspace.tables.getNullable(tableName);
         if (null == table)
             throw ire("Base table '%s' doesn't exist", tableName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
index c8e464a..4e66307 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/TableAttributes.java
@@ -128,6 +128,9 @@ public final class TableAttributes extends PropertyDefinitions
         if (hasOption(Option.SPECULATIVE_RETRY))
             builder.speculativeRetry(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_RETRY)));
 
+        if (hasOption(Option.SPECULATIVE_WRITE_THRESHOLD))
+            builder.speculativeWriteThreshold(SpeculativeRetryPolicy.fromString(getString(Option.SPECULATIVE_WRITE_THRESHOLD)));
+
         if (hasOption(Option.CRC_CHECK_CHANCE))
             builder.crcCheckChance(getDouble(Option.CRC_CHECK_CHANCE));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5e38584..56851e2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,7 +30,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
-import javax.annotation.Nullable;
 import javax.management.*;
 import javax.management.openmbean.*;
 
@@ -68,7 +67,6 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,7 +79,6 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.repair.TableRepairManager;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.TableStreamManager;
@@ -205,7 +202,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final Directories directories;
 
     public final TableMetrics metric;
-    public volatile long sampleLatencyNanos;
+    public volatile long sampleReadLatencyNanos;
+    public volatile long transientWriteLatencyNanos;
 
     private final CassandraTableWriteHandler writeHandler;
     private final CassandraStreamManager streamManager;
@@ -384,7 +382,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         viewManager = keyspace.viewManager.forTable(metadata.id);
         metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
-        sampleLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2);
+        sampleReadLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2);
+        transientWriteLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout() / 2);
 
         logger.info("Initializing {}.{}", keyspace.getName(), name);
 
@@ -454,7 +453,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         try
         {
-            sampleLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency);
+            sampleReadLatencyNanos = metadata().params.speculativeRetry.calculateThreshold(metric.coordinatorReadLatency);
+            transientWriteLatencyNanos = metadata().params.speculativeWriteThreshold.calculateThreshold(metric.coordinatorWriteLatency);
         }
         catch (Throwable e)
         {
@@ -487,15 +487,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return directories;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, txn);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, collector, header, txn);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
     {
-        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, metadataCollector, header, indexManager.listIndexes(), txn);
+        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadataCollector, header, indexManager.listIndexes(), txn);
     }
 
     public boolean supportsEarlyOpen()
@@ -1402,7 +1402,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // cleanup size estimation only counts bytes for keys local to this node
         long expectedFileSize = 0;
-        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges();
         for (SSTableReader sstable : sstables)
         {
             List<SSTableReader.PartitionPositionBounds> positions = sstable.getPositionsForRanges(ranges);
@@ -1677,7 +1677,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public void cleanupCache()
     {
-        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+        Collection<Range<Token>> ranges = StorageService.instance.getLocalReplicas(keyspace.getName()).ranges();
 
         for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
              keyIter.hasNext(); )

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index d37da0a..35ba198 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,16 +17,18 @@
  */
 package org.apache.cassandra.db;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.Replicas;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -47,7 +49,8 @@ public enum ConsistencyLevel
     EACH_QUORUM (7),
     SERIAL      (8),
     LOCAL_SERIAL(9),
-    LOCAL_ONE   (10, true);
+    LOCAL_ONE   (10, true),
+    NODE_LOCAL  (11, true);
 
     private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class);
 
@@ -89,13 +92,13 @@ public enum ConsistencyLevel
 
     private int quorumFor(Keyspace keyspace)
     {
-        return (keyspace.getReplicationStrategy().getReplicationFactor() / 2) + 1;
+        return (keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1;
     }
 
     private int localQuorumFor(Keyspace keyspace, String dc)
     {
         return (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
-             ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc) / 2) + 1
+             ? (((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1
              : quorumFor(keyspace);
     }
 
@@ -116,7 +119,7 @@ public enum ConsistencyLevel
             case SERIAL:
                 return quorumFor(keyspace);
             case ALL:
-                return keyspace.getReplicationStrategy().getReplicationFactor();
+                return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
             case LOCAL_QUORUM:
             case LOCAL_SERIAL:
                 return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
@@ -138,6 +141,28 @@ public enum ConsistencyLevel
         }
     }
 
+    public int blockForWrite(Keyspace keyspace, Endpoints<?> pending)
+    {
+        assert pending != null;
+
+        int blockFor = blockFor(keyspace);
+        switch (this)
+        {
+            case ANY:
+                break;
+            case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL:
+                // we will only count local replicas towards our response count, as these queries only care about local guarantees
+                blockFor += countDCLocalReplicas(pending).allReplicas();
+                break;
+            case ONE: case TWO: case THREE:
+            case QUORUM: case EACH_QUORUM:
+            case SERIAL:
+            case ALL:
+                blockFor += pending.size();
+        }
+        return blockFor;
+    }
+
     /**
      * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace
      */
@@ -156,40 +181,75 @@ public enum ConsistencyLevel
         return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
     }
 
-    public int countLocalEndpoints(Iterable<InetAddressAndPort> liveEndpoints)
+    public static boolean isLocal(Replica replica)
+    {
+        return isLocal(replica.endpoint());
+    }
+
+    private static ReplicaCount countDCLocalReplicas(ReplicaCollection<?> liveReplicas)
     {
-        int count = 0;
-        for (InetAddressAndPort endpoint : liveEndpoints)
-            if (isLocal(endpoint))
-                count++;
+        ReplicaCount count = new ReplicaCount();
+        for (Replica replica : liveReplicas)
+            if (isLocal(replica))
+                count.increment(replica);
         return count;
     }
 
-    private Map<String, Integer> countPerDCEndpoints(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints)
+    private static class ReplicaCount
+    {
+        int fullReplicas;
+        int transientReplicas;
+
+        int allReplicas()
+        {
+            return fullReplicas + transientReplicas;
+        }
+
+        void increment(Replica replica)
+        {
+            if (replica.isFull()) ++fullReplicas;
+            else ++transientReplicas;
+        }
+
+        boolean isSufficient(int allReplicas, int fullReplicas)
+        {
+            return this.fullReplicas >= fullReplicas
+                    && this.allReplicas() >= allReplicas;
+        }
+    }
+
+    private static Map<String, ReplicaCount> countPerDCEndpoints(Keyspace keyspace, Iterable<Replica> liveReplicas)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
 
-        Map<String, Integer> dcEndpoints = new HashMap<String, Integer>();
+        Map<String, ReplicaCount> dcEndpoints = new HashMap<>();
         for (String dc: strategy.getDatacenters())
-            dcEndpoints.put(dc, 0);
+            dcEndpoints.put(dc, new ReplicaCount());
 
-        for (InetAddressAndPort endpoint : liveEndpoints)
+        for (Replica replica : liveReplicas)
         {
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
-            dcEndpoints.put(dc, dcEndpoints.get(dc) + 1);
+            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            dcEndpoints.get(dc).increment(replica);
         }
         return dcEndpoints;
     }
 
-    public List<InetAddressAndPort> filterForQuery(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints)
+    public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas)
+    {
+        return filterForQuery(keyspace, liveReplicas, false);
+    }
+
+    public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E liveReplicas, boolean alwaysSpeculate)
     {
         /*
          * If we are doing an each quorum query, we have to make sure that the endpoints we select
          * provide a quorum for each data center. If we are not using a NetworkTopologyStrategy,
          * we should fall through and grab a quorum in the replication strategy.
+         *
+         * We do not speculate for EACH_QUORUM.
          */
         if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
-            return filterForEachQuorum(keyspace, liveEndpoints);
+            return filterForEachQuorum(keyspace, liveReplicas);
 
         /*
          * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
@@ -198,36 +258,34 @@ public enum ConsistencyLevel
          * the blockFor first ones).
          */
         if (isDCLocal)
-            liveEndpoints.sort(DatabaseDescriptor.getLocalComparator());
+            liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
 
-        return liveEndpoints.subList(0, Math.min(liveEndpoints.size(), blockFor(keyspace)));
+        return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
     }
 
-    private List<InetAddressAndPort> filterForEachQuorum(Keyspace keyspace, List<InetAddressAndPort> liveEndpoints)
+    private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-
-        Map<String, List<InetAddressAndPort>> dcsEndpoints = new HashMap<>();
-        for (String dc: strategy.getDatacenters())
-            dcsEndpoints.put(dc, new ArrayList<>());
-
-        for (InetAddressAndPort add : liveEndpoints)
+        Map<String, Integer> dcsReplicas = new HashMap<>();
+        for (String dc : strategy.getDatacenters())
         {
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(add);
-            dcsEndpoints.get(dc).add(add);
+            // we put _up to_ dc replicas only
+            dcsReplicas.put(dc, localQuorumFor(keyspace, dc));
         }
 
-        List<InetAddressAndPort> waitSet = new ArrayList<>();
-        for (Map.Entry<String, List<InetAddressAndPort>> dcEndpoints : dcsEndpoints.entrySet())
-        {
-            List<InetAddressAndPort> dcEndpoint = dcEndpoints.getValue();
-            waitSet.addAll(dcEndpoint.subList(0, Math.min(localQuorumFor(keyspace, dcEndpoints.getKey()), dcEndpoint.size())));
-        }
-
-        return waitSet;
+        return liveReplicas.filter((replica) -> {
+            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            int replicas = dcsReplicas.get(dc);
+            if (replicas > 0)
+            {
+                dcsReplicas.put(dc, --replicas);
+                return true;
+            }
+            return false;
+        });
     }
 
-    public boolean isSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints)
+    public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
     {
         switch (this)
         {
@@ -235,75 +293,92 @@ public enum ConsistencyLevel
                 // local hint is acceptable, and local node is always live
                 return true;
             case LOCAL_ONE:
-                return countLocalEndpoints(liveEndpoints) >= 1;
+                return countDCLocalReplicas(liveReplicas).isSufficient(1, 1);
             case LOCAL_QUORUM:
-                return countLocalEndpoints(liveEndpoints) >= blockFor(keyspace);
+                return countDCLocalReplicas(liveReplicas).isSufficient(blockFor(keyspace), 1);
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
-                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+                    int fullCount = 0;
+                    for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, liveReplicas).entrySet())
                     {
-                        if (entry.getValue() < localQuorumFor(keyspace, entry.getKey()))
+                        ReplicaCount count = entry.getValue();
+                        if (!count.isSufficient(localQuorumFor(keyspace, entry.getKey()), 0))
                             return false;
+                        fullCount += count.fullReplicas;
                     }
-                    return true;
+                    return fullCount > 0;
                 }
                 // Fallthough on purpose for SimpleStrategy
             default:
-                return Iterables.size(liveEndpoints) >= blockFor(keyspace);
+                return liveReplicas.size() >= blockFor(keyspace)
+                        && Replicas.countFull(liveReplicas) > 0;
         }
     }
 
-    public void assureSufficientLiveNodes(Keyspace keyspace, Iterable<InetAddressAndPort> liveEndpoints) throws UnavailableException
+    public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
+    {
+        assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1);
+    }
+    public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+    {
+        assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
+    }
+    public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
     {
-        int blockFor = blockFor(keyspace);
         switch (this)
         {
             case ANY:
                 // local hint is acceptable, and local node is always live
                 break;
             case LOCAL_ONE:
-                if (countLocalEndpoints(liveEndpoints) == 0)
-                    throw new UnavailableException(this, 1, 0);
+            {
+                ReplicaCount localLive = countDCLocalReplicas(allLive);
+                if (!localLive.isSufficient(blockFor, blockForFullReplicas))
+                    throw UnavailableException.create(this, 1, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
                 break;
+            }
             case LOCAL_QUORUM:
-                int localLive = countLocalEndpoints(liveEndpoints);
-                if (localLive < blockFor)
+            {
+                ReplicaCount localLive = countDCLocalReplicas(allLive);
+                if (!localLive.isSufficient(blockFor, blockForFullReplicas))
                 {
                     if (logger.isTraceEnabled())
                     {
-                        StringBuilder builder = new StringBuilder("Local replicas [");
-                        for (InetAddressAndPort endpoint : liveEndpoints)
-                        {
-                            if (isLocal(endpoint))
-                                builder.append(endpoint).append(",");
-                        }
-                        builder.append("] are insufficient to satisfy LOCAL_QUORUM requirement of ").append(blockFor).append(" live nodes in '").append(DatabaseDescriptor.getLocalDataCenter()).append("'");
-                        logger.trace(builder.toString());
+                        logger.trace(String.format("Local replicas %s are insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d full replicas in '%s'",
+                                allLive.filter(ConsistencyLevel::isLocal), blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
                     }
-                    throw new UnavailableException(this, blockFor, localLive);
+                    throw UnavailableException.create(this, blockFor, blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
                 }
                 break;
+            }
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
-                    for (Map.Entry<String, Integer> entry : countPerDCEndpoints(keyspace, liveEndpoints).entrySet())
+                    int total = 0;
+                    int totalFull = 0;
+                    for (Map.Entry<String, ReplicaCount> entry : countPerDCEndpoints(keyspace, allLive).entrySet())
                     {
                         int dcBlockFor = localQuorumFor(keyspace, entry.getKey());
-                        int dcLive = entry.getValue();
-                        if (dcLive < dcBlockFor)
-                            throw new UnavailableException(this, entry.getKey(), dcBlockFor, dcLive);
+                        ReplicaCount dcCount = entry.getValue();
+                        if (!dcCount.isSufficient(dcBlockFor, 0))
+                            throw UnavailableException.create(this, entry.getKey(), dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas);
+                        totalFull += dcCount.fullReplicas;
+                        total += dcCount.allReplicas();
                     }
+                    if (totalFull < blockForFullReplicas)
+                        throw UnavailableException.create(this, blockFor, total, blockForFullReplicas, totalFull);
                     break;
                 }
                 // Fallthough on purpose for SimpleStrategy
             default:
-                int live = Iterables.size(liveEndpoints);
-                if (live < blockFor)
+                int live = allLive.size();
+                int full = Replicas.countFull(allLive);
+                if (live < blockFor || full < blockForFullReplicas)
                 {
                     if (logger.isTraceEnabled())
-                        logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(liveEndpoints), blockFor);
-                    throw new UnavailableException(this, blockFor, live);
+                        logger.trace("Live nodes {} do not satisfy ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
+                    throw UnavailableException.create(this, blockFor, blockForFullReplicas, live, full);
                 }
                 break;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org