You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/28 08:28:08 UTC
[13/21] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b448b33
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b448b33
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b448b33
Branch: refs/heads/trunk
Commit: 3b448b33785c86f554bc07266de1c39e0ac502d9
Parents: 6fcf335 2811f15
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 28 10:16:49 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 28 10:16:49 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../net/IncomingStreamingConnection.java | 7 ++--
.../apache/cassandra/net/MessagingService.java | 12 +++++-
.../cassandra/streaming/ConnectionHandler.java | 39 +++++++++++++++-----
.../cassandra/streaming/StreamResultFuture.java | 27 +++++++-------
.../streaming/StreamingTransferTest.java | 23 ++++++++++++
6 files changed, 80 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8fbdf3b,5741241..8d2062d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
-2.1.15
+2.2.7
+ * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
+ * Validate bloom_filter_fp_chance against lowest supported
+ value when the table is created (CASSANDRA-11920)
+ * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
+ * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
+ * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+ * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
+ * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
+ * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
+ * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
+ * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
+ * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+ * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
+ * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
+ * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
+ * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
+ * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
+ * JSON datetime formatting needs timezone (CASSANDRA-11137)
+ * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
+ * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
+ * Add missing files to debian packages (CASSANDRA-11642)
+ * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
+ * cqlsh: COPY FROM should use regular inserts for single statement batches and
+ report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
+ * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
+ * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
+Merged from 2.1:
+ * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
* Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
* Prevent select statements with clustering key > 64k (CASSANDRA-11882)
* Avoid marking too many sstables as repaired (CASSANDRA-11696)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 79a814d,bfe92f9..ab262c7
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@@ -72,13 -71,11 +72,12 @@@ public class IncomingStreamingConnectio
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
- StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version);
++ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
}
catch (IOException e)
{
- logger.debug("IOException reading from socket; closing", e);
+ logger.error(String.format("IOException while reading from socket from %s, closing: %s",
+ socket.getRemoteSocketAddress(), e));
- logger.trace(String.format("IOException while reading from socket from %s, closing", socket.getRemoteSocketAddress()), e);
close();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 459923b,ac8ad79..8c8a333
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -1178,14 -1136,21 +1180,20 @@@ public final class MessagingService imp
return result;
}
- public Map<String, Long> getRecentTimeoutsPerHost()
+ public static IPartitioner globalPartitioner()
{
- Map<String, Long> result = new HashMap<String, Long>();
- for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
- {
- String ip = entry.getKey().getHostAddress();
- long recent = entry.getValue().getRecentTimeouts();
- result.put(ip, recent);
- }
- return result;
+ return DatabaseDescriptor.getPartitioner();
+ }
+
+ public static void validatePartitioner(AbstractBounds<?> bounds)
+ {
+ if (globalPartitioner() != bounds.left.getPartitioner())
+ throw new AssertionError();
}
+
+ @VisibleForTesting
+ public List<SocketThread> getSocketThreads()
+ {
+ return socketThreads;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d4662c7,60ce11e..c497a39
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -34,12 -33,11 +34,13 @@@ import java.util.concurrent.atomic.Atom
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+ import org.apache.cassandra.net.IncomingStreamingConnection;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
@@@ -192,15 -188,17 +194,21 @@@ public class ConnectionHandle
session.sessionIndex(),
session.planId(),
session.description(),
- isForOutgoing);
+ isForOutgoing,
+ session.keepSSTableLevel(),
+ session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
- getWriteChannel(socket).write(messageBuf);
+ DataOutputStreamPlus out = getWriteChannel(socket);
+ out.write(messageBuf);
+ out.flush();
}
+ public void start(IncomingStreamingConnection connection, int protocolVersion)
+ {
+ this.incomingConnection = connection;
+ start(connection.socket, protocolVersion);
+ }
+
public void start(Socket socket, int protocolVersion)
{
this.socket = socket;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 99adab0,5c9c6de..b299b87
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@@ -99,11 -100,9 +100,11 @@@ public final class StreamResultFuture e
UUID planId,
String description,
InetAddress from,
- Socket socket,
+ IncomingStreamingConnection connection,
boolean isForOutgoing,
- int version) throws IOException
+ int version,
+ boolean keepSSTableLevel,
+ boolean isIncremental) throws IOException
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@@ -111,10 -110,10 +112,10 @@@
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, description);
+ future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
StreamManager.instance.registerReceiving(future);
}
- future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+ future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
return future;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index e751968,abff812..2b16267
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -56,8 -50,9 +56,9 @@@ import org.apache.cassandra.db.marshal.
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;