You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/28 19:50:43 UTC
svn commit: r1140751 - in /cassandra/branches/cassandra-0.8: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/
Author: jbellis
Date: Tue Jun 28 17:50:43 2011
New Revision: 1140751
URL: http://svn.apache.org/viewvc?rev=1140751&view=rev
Log:
fix Message version propagation fromold nodes to new ones
patch by brandonwilliams and jbellis for CASSANDRA-2818
Modified:
cassandra/branches/cassandra-0.8/ (props changed)
cassandra/branches/cassandra-0.8/contrib/ (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Propchange: cassandra/branches/cassandra-0.8/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1140565
+/cassandra/branches/cassandra-0.7:1026516-1140567
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/branches/cassandra-0.8:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1140565
+/cassandra/branches/cassandra-0.7/contrib:1026516-1140567
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
Propchange: cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jun 28 17:50:43 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140565
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1140567
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125041
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1140751&r1=1140750&r2=1140751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Tue Jun 28 17:50:43 2011
@@ -871,17 +871,13 @@ public class Gossiper implements IFailur
*/
public void addSavedEndpoint(InetAddress ep)
{
- EndpointState epState = endpointStateMap.get(ep);
- if (epState == null)
- {
- epState = new EndpointState(new HeartBeatState(0));
- epState.markDead();
- epState.setHasToken(true);
- endpointStateMap.put(ep, epState);
- unreachableEndpoints.put(ep, System.currentTimeMillis());
- if (logger.isTraceEnabled())
- logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
- }
+ EndpointState epState = new EndpointState(new HeartBeatState(0));
+ epState.markDead();
+ epState.setHasToken(true);
+ endpointStateMap.put(ep, epState);
+ unreachableEndpoints.put(ep, System.currentTimeMillis());
+ if (logger.isTraceEnabled())
+ logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
}
public void addLocalApplicationState(ApplicationState state, VersionedValue value)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1140751&r1=1140750&r2=1140751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jun 28 17:50:43 2011
@@ -70,25 +70,43 @@ public class IncomingTcpConnection exten
// we should buffer
input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
version = MessagingService.getBits(header, 15, 8);
- Gossiper.instance.setVersion(socket.getInetAddress(), version);
+ if (logger.isDebugEnabled())
+ logger.debug("Version for " + socket.getInetAddress() + " is " + version);
}
catch (IOException e)
{
close();
throw new IOError(e);
}
+
+ if (version > MessagingService.version_)
+ {
+ // save the endpoint so gossip will reconnect to it
+ Gossiper.instance.addSavedEndpoint(socket.getInetAddress());
+ logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignorning");
+
+ // streaming connections are per-session and have a fixed version. we can't do anything with a new-version
+ // stream connection, so drop it.
+ if (isStream)
+ {
+ close();
+ return;
+ }
+ // for non-streaming connections, continue to read the messages (and ignore them) until sender
+ // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+ }
+ else
+ {
+ // only set version when <= to us, otherwise it's the responsibility of the other end to mimic us
+ Gossiper.instance.setVersion(socket.getInetAddress(), version);
+ }
+
while (true)
{
try
{
if (isStream)
{
- if (version > MessagingService.version_)
- {
- logger.error("Received untranslated stream from newer protcol version. Terminating connection!");
- close();
- return;
- }
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
@@ -106,11 +124,8 @@ public class IncomingTcpConnection exten
input.readFully(contentBytes, offset, CHUNK_SIZE);
input.readFully(contentBytes, size - remainder, remainder);
- if (version > MessagingService.version_)
- logger.info("Received connection from newer protocol version. Ignorning message.");
- else
+ if (version <= MessagingService.version_)
{
- // todo: need to be aware of message version.
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
String id = dis.readUTF();
Message message = Message.serializer().deserialize(dis, version);
@@ -120,9 +135,8 @@ public class IncomingTcpConnection exten
// prepare to read the next message
MessagingService.validateMagic(input.readInt());
int header = input.readInt();
- version = MessagingService.getBits(header, 15, 8);
assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
- assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
+ version = MessagingService.getBits(header, 15, 8);
}
catch (EOFException e)
{