You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/01/28 16:20:38 UTC
[1/3] git commit: Fix NPE when streaming connection is not yet ready
Updated Branches:
refs/heads/cassandra-2.0 20c2adc87 -> 41ffca128
refs/heads/trunk 025474177 -> 9c9552aea
Fix NPE when streaming connection is not yet ready
patch by yukim; reviewed by Russell Spitzer for CASSANDRA-6210
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41ffca12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41ffca12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41ffca12
Branch: refs/heads/cassandra-2.0
Commit: 41ffca1281dcdc69b1b843b47a5bb6dc3c462aac
Parents: 20c2adc
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 28 09:17:30 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 28 09:17:30 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 117 +++++++++----------
.../streaming/messages/StreamMessage.java | 3 +-
3 files changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68727dc..46b14fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
* Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
* Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 57f76a7..356138b 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -64,6 +65,8 @@ public class ConnectionHandler
ConnectionHandler(StreamSession session)
{
this.session = session;
+ this.incoming = new IncomingMessageHandler(session);
+ this.outgoing = new OutgoingMessageHandler(session);
}
/**
@@ -77,15 +80,13 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = connect(session.peer);
- incoming = new IncomingMessageHandler(session, incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(true);
- incoming.start();
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
+ incoming.sendInitMessage(incomingSocket, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = connect(session.peer);
- outgoing = new OutgoingMessageHandler(session, outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(false);
- outgoing.start();
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
+ outgoing.sendInitMessage(outgoingSocket, false);
}
/**
@@ -98,15 +99,9 @@ public class ConnectionHandler
public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
{
if (isForOutgoing)
- {
- outgoing = new OutgoingMessageHandler(session, socket, version);
- outgoing.start();
- }
+ outgoing.start(socket, version);
else
- {
- incoming = new IncomingMessageHandler(session, socket, version);
- incoming.start();
- }
+ incoming.start(socket, version);
}
/**
@@ -189,21 +184,19 @@ public class ConnectionHandler
{
protected final StreamSession session;
- protected final Socket socket;
- protected final int protocolVersion;
+ protected int protocolVersion;
+ protected Socket socket;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
- protected MessageHandler(StreamSession session, Socket socket, int protocolVersion)
+ protected MessageHandler(StreamSession session)
{
this.session = session;
- this.socket = socket;
- this.protocolVersion = protocolVersion;
}
protected abstract String name();
- protected WritableByteChannel getWriteChannel() throws IOException
+ protected static WritableByteChannel getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
// socket channel is null when encrypted(SSL)
@@ -212,7 +205,7 @@ public class ConnectionHandler
: out;
}
- protected ReadableByteChannel getReadChannel() throws IOException
+ protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
{
ReadableByteChannel in = socket.getChannel();
// socket channel is null when encrypted(SSL)
@@ -221,14 +214,19 @@ public class ConnectionHandler
: in;
}
- public void sendInitMessage(boolean isForOutgoing) throws IOException
+ public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
- getWriteChannel().write(message.createMessage(false, protocolVersion));
+ ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
+ while (messageBuf.hasRemaining())
+ getWriteChannel(socket).write(messageBuf);
}
- public void start()
+ public void start(Socket socket, int protocolVersion)
{
+ this.socket = socket;
+ this.protocolVersion = protocolVersion;
+
new Thread(this, name() + "-" + session.peer).start();
}
@@ -264,12 +262,9 @@ public class ConnectionHandler
*/
static class IncomingMessageHandler extends MessageHandler
{
- private final ReadableByteChannel in;
-
- IncomingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException
+ IncomingMessageHandler(StreamSession session)
{
- super(session, socket, protocolVersion);
- this.in = getReadChannel();
+ super(session);
}
protected String name()
@@ -279,9 +274,10 @@ public class ConnectionHandler
public void run()
{
- while (!isClosed())
+ try
{
- try
+ ReadableByteChannel in = getReadChannel(socket);
+ while (!isClosed())
{
// receive message
StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
@@ -293,17 +289,20 @@ public class ConnectionHandler
session.messageReceived(message);
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
- catch (Throwable e)
- {
- session.onError(e);
- }
}
- signalCloseDone();
+ catch (SocketException e)
+ {
+ // socket is closed
+ close();
+ }
+ catch (Throwable e)
+ {
+ session.onError(e);
+ }
+ finally
+ {
+ signalCloseDone();
+ }
}
}
@@ -326,12 +325,9 @@ public class ConnectionHandler
}
});
- private final WritableByteChannel out;
-
- OutgoingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException
+ OutgoingMessageHandler(StreamSession session)
{
- super(session, socket, protocolVersion);
- this.out = getWriteChannel();
+ super(session);
}
protected String name()
@@ -346,30 +342,33 @@ public class ConnectionHandler
public void run()
{
- StreamMessage next;
- while (!isClosed())
+ try
{
- try
+ WritableByteChannel out = getWriteChannel(socket);
+
+ StreamMessage next;
+ while (!isClosed())
{
if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null)
{
logger.debug("[Stream #{}] Sending {}", session.planId(), next);
- sendMessage(next);
+ sendMessage(out, next);
if (next.type == StreamMessage.Type.SESSION_FAILED)
close();
}
}
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- try
- {
// Sends the last messages on the queue
while ((next = messageQueue.poll()) != null)
- sendMessage(next);
+ sendMessage(out, next);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (IOException e)
+ {
+ session.onError(e);
}
finally
{
@@ -377,7 +376,7 @@ public class ConnectionHandler
}
}
- private void sendMessage(StreamMessage message)
+ private void sendMessage(WritableByteChannel out, StreamMessage message)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 2e7341b..9e146e8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -40,7 +40,8 @@ public abstract class StreamMessage
// message type
buff.put(message.type.type);
buff.flip();
- out.write(buff);
+ while (buff.hasRemaining())
+ out.write(buff);
message.type.serializer.serialize(message, out, version, session);
}
[3/3] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c9552ae
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c9552ae
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c9552ae
Branch: refs/heads/trunk
Commit: 9c9552aead388dbc777d1e9219a92fcd0830b290
Parents: 0254741 41ffca1
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 28 09:19:05 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 28 09:19:05 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 117 +++++++++----------
.../streaming/messages/StreamMessage.java | 3 +-
3 files changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c9552ae/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c9552ae/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
[2/3] git commit: Fix NPE when streaming connection is not yet ready
Posted by yu...@apache.org.
Fix NPE when streaming connection is not yet ready
patch by yukim; reviewed by Russell Spitzer for CASSANDRA-6210
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41ffca12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41ffca12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41ffca12
Branch: refs/heads/trunk
Commit: 41ffca1281dcdc69b1b843b47a5bb6dc3c462aac
Parents: 20c2adc
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 28 09:17:30 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 28 09:17:30 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/ConnectionHandler.java | 117 +++++++++----------
.../streaming/messages/StreamMessage.java | 3 +-
3 files changed, 61 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 68727dc..46b14fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
* Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
* Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 57f76a7..356138b 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -64,6 +65,8 @@ public class ConnectionHandler
ConnectionHandler(StreamSession session)
{
this.session = session;
+ this.incoming = new IncomingMessageHandler(session);
+ this.outgoing = new OutgoingMessageHandler(session);
}
/**
@@ -77,15 +80,13 @@ public class ConnectionHandler
{
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = connect(session.peer);
- incoming = new IncomingMessageHandler(session, incomingSocket, StreamMessage.CURRENT_VERSION);
- incoming.sendInitMessage(true);
- incoming.start();
+ incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
+ incoming.sendInitMessage(incomingSocket, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = connect(session.peer);
- outgoing = new OutgoingMessageHandler(session, outgoingSocket, StreamMessage.CURRENT_VERSION);
- outgoing.sendInitMessage(false);
- outgoing.start();
+ outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
+ outgoing.sendInitMessage(outgoingSocket, false);
}
/**
@@ -98,15 +99,9 @@ public class ConnectionHandler
public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
{
if (isForOutgoing)
- {
- outgoing = new OutgoingMessageHandler(session, socket, version);
- outgoing.start();
- }
+ outgoing.start(socket, version);
else
- {
- incoming = new IncomingMessageHandler(session, socket, version);
- incoming.start();
- }
+ incoming.start(socket, version);
}
/**
@@ -189,21 +184,19 @@ public class ConnectionHandler
{
protected final StreamSession session;
- protected final Socket socket;
- protected final int protocolVersion;
+ protected int protocolVersion;
+ protected Socket socket;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
- protected MessageHandler(StreamSession session, Socket socket, int protocolVersion)
+ protected MessageHandler(StreamSession session)
{
this.session = session;
- this.socket = socket;
- this.protocolVersion = protocolVersion;
}
protected abstract String name();
- protected WritableByteChannel getWriteChannel() throws IOException
+ protected static WritableByteChannel getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
// socket channel is null when encrypted(SSL)
@@ -212,7 +205,7 @@ public class ConnectionHandler
: out;
}
- protected ReadableByteChannel getReadChannel() throws IOException
+ protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
{
ReadableByteChannel in = socket.getChannel();
// socket channel is null when encrypted(SSL)
@@ -221,14 +214,19 @@ public class ConnectionHandler
: in;
}
- public void sendInitMessage(boolean isForOutgoing) throws IOException
+ public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
- getWriteChannel().write(message.createMessage(false, protocolVersion));
+ ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
+ while (messageBuf.hasRemaining())
+ getWriteChannel(socket).write(messageBuf);
}
- public void start()
+ public void start(Socket socket, int protocolVersion)
{
+ this.socket = socket;
+ this.protocolVersion = protocolVersion;
+
new Thread(this, name() + "-" + session.peer).start();
}
@@ -264,12 +262,9 @@ public class ConnectionHandler
*/
static class IncomingMessageHandler extends MessageHandler
{
- private final ReadableByteChannel in;
-
- IncomingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException
+ IncomingMessageHandler(StreamSession session)
{
- super(session, socket, protocolVersion);
- this.in = getReadChannel();
+ super(session);
}
protected String name()
@@ -279,9 +274,10 @@ public class ConnectionHandler
public void run()
{
- while (!isClosed())
+ try
{
- try
+ ReadableByteChannel in = getReadChannel(socket);
+ while (!isClosed())
{
// receive message
StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
@@ -293,17 +289,20 @@ public class ConnectionHandler
session.messageReceived(message);
}
}
- catch (SocketException e)
- {
- // socket is closed
- close();
- }
- catch (Throwable e)
- {
- session.onError(e);
- }
}
- signalCloseDone();
+ catch (SocketException e)
+ {
+ // socket is closed
+ close();
+ }
+ catch (Throwable e)
+ {
+ session.onError(e);
+ }
+ finally
+ {
+ signalCloseDone();
+ }
}
}
@@ -326,12 +325,9 @@ public class ConnectionHandler
}
});
- private final WritableByteChannel out;
-
- OutgoingMessageHandler(StreamSession session, Socket socket, int protocolVersion) throws IOException
+ OutgoingMessageHandler(StreamSession session)
{
- super(session, socket, protocolVersion);
- this.out = getWriteChannel();
+ super(session);
}
protected String name()
@@ -346,30 +342,33 @@ public class ConnectionHandler
public void run()
{
- StreamMessage next;
- while (!isClosed())
+ try
{
- try
+ WritableByteChannel out = getWriteChannel(socket);
+
+ StreamMessage next;
+ while (!isClosed())
{
if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null)
{
logger.debug("[Stream #{}] Sending {}", session.planId(), next);
- sendMessage(next);
+ sendMessage(out, next);
if (next.type == StreamMessage.Type.SESSION_FAILED)
close();
}
}
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
- try
- {
// Sends the last messages on the queue
while ((next = messageQueue.poll()) != null)
- sendMessage(next);
+ sendMessage(out, next);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (IOException e)
+ {
+ session.onError(e);
}
finally
{
@@ -377,7 +376,7 @@ public class ConnectionHandler
}
}
- private void sendMessage(StreamMessage message)
+ private void sendMessage(WritableByteChannel out, StreamMessage message)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/41ffca12/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 2e7341b..9e146e8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -40,7 +40,8 @@ public abstract class StreamMessage
// message type
buff.put(message.type.type);
buff.flip();
- out.write(buff);
+ while (buff.hasRemaining())
+ out.write(buff);
message.type.serializer.serialize(message, out, version, session);
}