You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/09/30 17:49:52 UTC
[1/2] cassandra git commit: Suppress some pushed events when
rpc_address is shared
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 fc675450e -> 7452b2050
Suppress some pushed events when rpc_address is shared
Patch by Stefania Alborghetti; reviewed by Olivier Michallat and Tyler Hobbs for
CASSANDRA-10052
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6cab37d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6cab37d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6cab37d
Branch: refs/heads/cassandra-2.2
Commit: f6cab37d5ee42313c7a5618c5d0694f312c9c194
Parents: 4c6411f
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Sep 30 10:46:34 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed Sep 30 10:46:34 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/transport/Event.java | 29 ++++++++----
.../org/apache/cassandra/transport/Server.java | 48 ++++++++++++++------
3 files changed, 57 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3364dcd..0ad2b36 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.10
+ * Avoid misleading pushed notifications when multiple nodes
+ share an rpc_address (CASSANDRA-10052)
* Fix dropping undroppable when message queue is full (CASSANDRA-10113)
* Fix potential ClassCastException during paging (CASSANDRA-10352)
* Prevent ALTER TYPE from creating circular references (CASSANDRA-10339)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index b7c5e68..12ad6e9 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -62,18 +62,32 @@ public abstract class Event
protected abstract void serializeEvent(ByteBuf dest, int version);
protected abstract int eventSerializedSize(int version);
- public static class TopologyChange extends Event
+ public static abstract class NodeEvent extends Event
+ {
+ public final InetSocketAddress node;
+
+ public InetAddress nodeAddress()
+ {
+ return node.getAddress();
+ }
+
+ private NodeEvent(Type type, InetSocketAddress node)
+ {
+ super(type);
+ this.node = node;
+ }
+ }
+
+ public static class TopologyChange extends NodeEvent
{
public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE }
public final Change change;
- public final InetSocketAddress node;
private TopologyChange(Change change, InetSocketAddress node)
{
- super(Type.TOPOLOGY_CHANGE);
+ super(Type.TOPOLOGY_CHANGE, node);
this.change = change;
- this.node = node;
}
public static TopologyChange newNode(InetAddress host, int port)
@@ -134,18 +148,17 @@ public abstract class Event
}
}
- public static class StatusChange extends Event
+
+ public static class StatusChange extends NodeEvent
{
public enum Status { UP, DOWN }
public final Status status;
- public final InetSocketAddress node;
private StatusChange(Status status, InetSocketAddress node)
{
- super(Type.STATUS_CHANGE);
+ super(Type.STATUS_CHANGE, node);
this.status = status;
- this.node = node;
}
public static StatusChange nodeUp(InetAddress host, int port)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8f0f89f..c21a669 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -55,6 +55,7 @@ import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.ssl.SslHandler;
+import org.apache.cassandra.utils.FBUtilities;
public class Server implements CassandraDaemon.Server
{
@@ -381,78 +382,97 @@ public class Server implements CassandraDaemon.Server
}
}
+ private void send(InetAddress endpoint, Event.NodeEvent event)
+ {
+ // If the endpoint is not the local node, extract the node address
+ // and if it is the same as our own RPC broadcast address (which defaults to the rcp address)
+ // then don't send the notification. This covers the case of rpc_address set to "localhost",
+ // which is not useful to any driver and in fact may cauase serious problems to some drivers,
+ // see CASSANDRA-10052
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+ event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress()))
+ return;
+
+ send(event);
+ }
+
+ private void send(Event event)
+ {
+ server.connectionTracker.send(event);
+ }
+
public void onJoinCluster(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+ send(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onLeaveCluster(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
+ send(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onMove(InetAddress endpoint)
{
- server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
+ send(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onUp(InetAddress endpoint)
{
Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP);
if (prev == null || prev != Event.StatusChange.Status.UP)
- server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
+ send(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onDown(InetAddress endpoint)
{
Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN);
if (prev == null || prev != Event.StatusChange.Status.DOWN)
- server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ send(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onCreateKeyspace(String ksName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
}
public void onCreateColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onCreateUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onUpdateKeyspace(String ksName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
}
public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onUpdateUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onDropKeyspace(String ksName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
}
public void onDropColumnFamily(String ksName, String cfName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onDropUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
}
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ty...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7452b205
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7452b205
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7452b205
Branch: refs/heads/cassandra-2.2
Commit: 7452b20503c376c9ea15fdfac8da0c78381b3f73
Parents: fc67545 f6cab37
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Wed Sep 30 10:49:42 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed Sep 30 10:49:42 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/transport/Event.java | 29 +++++---
.../org/apache/cassandra/transport/Server.java | 71 +++++++++++++-------
3 files changed, 70 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c36c6f5,0ad2b36..45070b2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,6 +1,18 @@@
-2.1.10
+2.2.2
+ * cqlsh prompt includes name of keyspace after failed `use` statement (CASSANDRA-10369)
+ * Configurable page size in cqlsh (CASSANDRA-9855)
+ * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
+ * Cancel transaction for sstables we wont redistribute index summary
+ for (CASSANDRA-10270)
+ * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209)
+ * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
+ * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
+ * Fix repair hang when snapshot failed (CASSANDRA-10057)
+ * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
+ (CASSANDRA-10199)
+Merged from 2.1:
+ * Avoid misleading pushed notifications when multiple nodes
+ share an rpc_address (CASSANDRA-10052)
* Fix dropping undroppable when message queue is full (CASSANDRA-10113)
* Fix potential ClassCastException during paging (CASSANDRA-10352)
* Prevent ALTER TYPE from creating circular references (CASSANDRA-10339)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 72a1b60,c21a669..d610bff
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -55,6 -50,12 +55,7 @@@ import org.apache.cassandra.metrics.Cli
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.transport.messages.EventMessage;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.*;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.handler.ssl.SslHandler;
+ import org.apache.cassandra.utils.FBUtilities;
public class Server implements CassandraDaemon.Server
{
@@@ -409,9 -382,28 +410,31 @@@
}
}
+ private void send(InetAddress endpoint, Event.NodeEvent event)
+ {
++ if (logger.isTraceEnabled())
++ logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddress());
++
+ // If the endpoint is not the local node, extract the node address
+ // and if it is the same as our own RPC broadcast address (which defaults to the rcp address)
+ // then don't send the notification. This covers the case of rpc_address set to "localhost",
+ // which is not useful to any driver and in fact may cauase serious problems to some drivers,
+ // see CASSANDRA-10052
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+ event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress()))
+ return;
+
+ send(event);
+ }
+
+ private void send(Event event)
+ {
+ server.connectionTracker.send(event);
+ }
+
public void onJoinCluster(InetAddress endpoint)
{
- send(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
+ onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
}
public void onLeaveCluster(InetAddress endpoint)
@@@ -431,35 -425,9 +454,35 @@@
public void onDown(InetAddress endpoint)
{
- Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN);
- if (prev == null || prev != Event.StatusChange.Status.DOWN)
- send(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
+ }
+
+ private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Topology changed event : {}, {}", endpoint, event.change);
+
+ LatestEvent prev = latestEvents.get(endpoint);
+ if (prev == null || prev.topology != event.change)
+ {
+ LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev));
+ if (ret == prev)
- server.connectionTracker.send(event);
++ send(endpoint, event);
+ }
+ }
+
+ private void onStatusChange(InetAddress endpoint, Event.StatusChange event)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Status changed event : {}, {}", endpoint, event.status);
+
+ LatestEvent prev = latestEvents.get(endpoint);
+ if (prev == null || prev.status != event.status)
+ {
- LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, prev));
++ LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, null));
+ if (ret == prev)
- server.connectionTracker.send(event);
++ send(endpoint, event);
+ }
}
public void onCreateKeyspace(String ksName)
@@@ -474,24 -442,12 +497,24 @@@
public void onCreateUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
+ public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
- ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
+
+ public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
- ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
+
public void onUpdateKeyspace(String ksName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
}
public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
@@@ -501,24 -457,12 +524,24 @@@
public void onUpdateUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
+ public void onUpdateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
- ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
+
+ public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
- ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
+
public void onDropKeyspace(String ksName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
}
public void onDropColumnFamily(String ksName, String cfName)
@@@ -528,19 -472,7 +551,19 @@@
public void onDropUserType(String ksName, String typeName)
{
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
+ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
+
+ public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
- ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
+
+ public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
+ {
- server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
- ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
++ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
+ }
}
}