You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/04 21:54:28 UTC
[1/3] cassandra git commit: Avoid MV race during node decommission
Repository: cassandra
Updated Branches:
refs/heads/trunk 646da9aa8 -> 9fb436f93
Avoid MV race during node decommission
Patch by Paulo Motta; reviewed by Joel Knighton for CASSANDRA-10674
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c184e8c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c184e8c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c184e8c1
Branch: refs/heads/trunk
Commit: c184e8c14b28eddc20cbdd098f5e47d1ed832898
Parents: a4da379
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 25 14:50:31 2015 -0800
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:50:48 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
.../apache/cassandra/service/StorageProxy.java | 67 ++++++++++++--------
.../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f3f182..b95aa76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.1
+ * Avoid MV race during node decommission (CASSANDRA-10674)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Handle single-column deletions correction in materialized views
when the column is part of the view primary key (CASSANDRA-10796)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 089a3b7..4d9517f 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -21,13 +21,13 @@ package org.apache.cassandra.db.view;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
public final class ViewUtils
@@ -56,9 +56,9 @@ public final class ViewUtils
* B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
* C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
*
- * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+ * @return Optional.empty() if this method is called using a base token which does not belong to this replica
*/
- public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+ public static Optional<InetAddress> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
{
AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
@@ -77,7 +77,7 @@ public final class ViewUtils
{
// If we are a base endpoint which is also a view replica, we use ourselves as our view replica
if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
- return viewEndpoint;
+ return Optional.of(viewEndpoint);
// We have to remove any endpoint which is shared between the base and the view, as it will select itself
// and throw off the counts otherwise.
@@ -95,20 +95,9 @@ public final class ViewUtils
int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
if (baseIdx < 0)
- {
-
- if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
- {
- //Since there are pending endpoints we are going to write to the batchlog regardless.
- //So we can pretend we are the views endpoint.
-
- return FBUtilities.getBroadcastAddress();
- }
-
- throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
- }
-
+ //This node is not a base replica of this key, so we return empty
+ return Optional.empty();
- return viewEndpoints.get(baseIdx);
+ return Optional.of(viewEndpoints.get(baseIdx));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 15be7c6..397b8b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -670,12 +670,12 @@ public class StorageProxy implements StorageProxyMBean
if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving())
{
BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
- mutations),
- writeCommitLog);
+ mutations), writeCommitLog);
}
else
{
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+ List<Mutation> nonPairedMutations = new LinkedList<>();
Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
@@ -684,40 +684,51 @@ public class StorageProxy implements StorageProxyMBean
final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
() -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
-
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
- InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
- List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
-
- WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
- consistencyLevel,
- consistencyLevel,
- naturalEndpoints,
- baseComplete,
- WriteType.BATCH,
- cleanup);
-
- // When local node is the endpoint and there are no pending nodes we can
- // Just apply the mutation locally.
- if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+ Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
+
+ if (pairedEndpoint.isPresent())
{
- try
- {
- mutation.apply(writeCommitLog);
- }
- catch (Exception exc)
+ // When local node is the endpoint and there are no pending nodes we can
+ // Just apply the mutation locally.
+ if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress())
+ && pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+ try
+ {
+ mutation.apply(writeCommitLog);
+ }
+ catch (Exception exc)
+ {
+ logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation);
+ throw exc;
+ }
+ else
{
- logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation);
- throw exc;
+ wrappers.add(wrapViewBatchResponseHandler(mutation,
+ consistencyLevel,
+ consistencyLevel,
+ Collections.singletonList(pairedEndpoint.get()),
+ baseComplete,
+ WriteType.BATCH,
+ cleanup));
}
}
else
{
- wrappers.add(wrapper);
+ //if there are no paired endpoints there are probably range movements going on,
+ //so we write to the local batchlog to replay later
+ if (pendingEndpoints.isEmpty())
+ logger.warn("Received base materialized view mutation for key %s that does not belong " +
+ "to this node. There is probably a range movement happening (move or decommission)," +
+ "but this node hasn't updated its ring metadata yet. Adding mutation to " +
+ "local batchlog to be replayed later.",
+ mutation.key());
+ nonPairedMutations.add(mutation);
}
}
@@ -730,6 +741,12 @@ public class StorageProxy implements StorageProxyMBean
// now actually perform the writes and wait for them to complete
asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
}
+
+ if (!nonPairedMutations.isEmpty())
+ {
+ BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations),
+ writeCommitLog);
+ }
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 8fd0cfb..c238f36 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.view;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -74,11 +75,12 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.setKeyspaceMetadata(meta);
- InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
new StringToken("CA"),
new StringToken("BB"));
- Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint);
+ Assert.assertTrue(naturalEndpoint.isPresent());
+ Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get());
}
@@ -106,10 +108,42 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.setKeyspaceMetadata(meta);
- InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
new StringToken("CA"),
new StringToken("BB"));
- Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint);
+ Assert.assertTrue(naturalEndpoint.isPresent());
+ Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get());
+ }
+
+ @Test
+ public void testBaseTokenDoesNotBelongToLocalReplicaShouldReturnEmpty() throws Exception
+ {
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ // DC1
+ metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+ // DC2
+ metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+ Map<String, String> replicationMap = new HashMap<>();
+ replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
+
+ replicationMap.put("DC1", "1");
+ replicationMap.put("DC2", "1");
+
+ Keyspace.clear("Keyspace1");
+ KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
+ Schema.instance.setKeyspaceMetadata(meta);
+
+ Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("AB"),
+ new StringToken("BB"));
+
+ Assert.assertFalse(naturalEndpoint.isPresent());
}
}
[3/3] cassandra git commit: Merge branch 'cassandra-3.1' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fb436f9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fb436f9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fb436f9
Branch: refs/heads/trunk
Commit: 9fb436f936d9dc9740273b147c9c4a960638affc
Parents: 646da9a ee46dfb
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Dec 4 15:54:15 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:54:15 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
.../apache/cassandra/service/StorageProxy.java | 67 ++++++++++++--------
.../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fb436f9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9a0e5ce,a2e9434..0511ed4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,6 +1,21 @@@
+3.2
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
+ * Normalize all scripts (CASSANDRA-10679)
+ * Make compression ratio much more accurate (CASSANDRA-10225)
+ * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
+ * Make index building pluggable (CASSANDRA-10681)
+ * Add sstable flush observer (CASSANDRA-10678)
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
+ * Improve performance of the folderSize function (CASSANDRA-10677)
+ * Add support for type casting in selection clause (CASSANDRA-10310)
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.1
Merged from 3.0:
+ * Avoid MV race during node decommission (CASSANDRA-10674)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Handle single-column deletions correction in materialized views
when the column is part of the view primary key (CASSANDRA-10796)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fb436f9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee46dfbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee46dfbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee46dfbf
Branch: refs/heads/trunk
Commit: ee46dfbf934fc91d8d4865bef174b68174790edd
Parents: 2b8cdc2 c184e8c
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Dec 4 15:53:44 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:53:44 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
.../apache/cassandra/service/StorageProxy.java | 67 ++++++++++++--------
.../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee46dfbf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e06653f,b95aa76..a2e9434
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,5 -1,5 +1,6 @@@
-3.0.1
+3.1
+Merged from 3.0:
+ * Avoid MV race during node decommission (CASSANDRA-10674)
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Handle single-column deletions correction in materialized views
when the column is part of the view primary key (CASSANDRA-10796)