You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/28 08:28:08 UTC

[13/21] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b448b33
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b448b33
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b448b33

Branch: refs/heads/trunk
Commit: 3b448b33785c86f554bc07266de1c39e0ac502d9
Parents: 6fcf335 2811f15
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 28 10:16:49 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 28 10:16:49 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../net/IncomingStreamingConnection.java        |  7 ++--
 .../apache/cassandra/net/MessagingService.java  | 12 +++++-
 .../cassandra/streaming/ConnectionHandler.java  | 39 +++++++++++++++-----
 .../cassandra/streaming/StreamResultFuture.java | 27 +++++++-------
 .../streaming/StreamingTransferTest.java        | 23 ++++++++++++
 6 files changed, 80 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8fbdf3b,5741241..8d2062d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,5 +1,36 @@@
 -2.1.15
 +2.2.7
 + * Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
 + * Validate bloom_filter_fp_chance against lowest supported
 +   value when the table is created (CASSANDRA-11920)
 + * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
 + * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
 + * StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
   * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
   * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
   * Avoid marking too many sstables as repaired (CASSANDRA-11696)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 79a814d,bfe92f9..ab262c7
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@@ -72,13 -71,11 +72,12 @@@ public class IncomingStreamingConnectio
              // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
              // Note: we cannot use the same socket for incoming and outgoing streams because we want to
              // parallelize said streams and the socket is blocking, so we might deadlock.
-             StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
 -            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version);
++            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
          }
          catch (IOException e)
          {
 -            logger.debug("IOException reading from socket; closing", e);
 +            logger.error(String.format("IOException while reading from socket from %s, closing: %s",
 +                                       socket.getRemoteSocketAddress(), e));
-             logger.trace(String.format("IOException while reading from socket from %s, closing", socket.getRemoteSocketAddress()), e);
              close();
          }
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 459923b,ac8ad79..8c8a333
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -1178,14 -1136,21 +1180,20 @@@ public final class MessagingService imp
          return result;
      }
  
 -    public Map<String, Long> getRecentTimeoutsPerHost()
 +    public static IPartitioner globalPartitioner()
      {
 -        Map<String, Long> result = new HashMap<String, Long>();
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
 -        {
 -            String ip = entry.getKey().getHostAddress();
 -            long recent = entry.getValue().getRecentTimeouts();
 -            result.put(ip, recent);
 -        }
 -        return result;
 +        return DatabaseDescriptor.getPartitioner();
 +    }
 +
 +    public static void validatePartitioner(AbstractBounds<?> bounds)
 +    {
 +        if (globalPartitioner() != bounds.left.getPartitioner())
 +            throw new AssertionError();
      }
+ 
+     @VisibleForTesting
+     public List<SocketThread> getSocketThreads()
+     {
+         return socketThreads;
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d4662c7,60ce11e..c497a39
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@@ -34,12 -33,11 +34,13 @@@ import java.util.concurrent.atomic.Atom
  import com.google.common.util.concurrent.Futures;
  import com.google.common.util.concurrent.ListenableFuture;
  import com.google.common.util.concurrent.SettableFuture;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 -
 -import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.DataOutputStreamPlus;
 +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+ import org.apache.cassandra.net.IncomingStreamingConnection;
  import org.apache.cassandra.streaming.messages.StreamInitMessage;
  import org.apache.cassandra.streaming.messages.StreamMessage;
  import org.apache.cassandra.utils.FBUtilities;
@@@ -192,15 -188,17 +194,21 @@@ public class ConnectionHandle
                      session.sessionIndex(),
                      session.planId(),
                      session.description(),
 -                    isForOutgoing);
 +                    isForOutgoing,
 +                    session.keepSSTableLevel(),
 +                    session.isIncremental());
              ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
 -            getWriteChannel(socket).write(messageBuf);
 +            DataOutputStreamPlus out = getWriteChannel(socket);
 +            out.write(messageBuf);
 +            out.flush();
          }
  
+         public void start(IncomingStreamingConnection connection, int protocolVersion)
+         {
+             this.incomingConnection = connection;
+             start(connection.socket, protocolVersion);
+         }
+ 
          public void start(Socket socket, int protocolVersion)
          {
              this.socket = socket;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 99adab0,5c9c6de..b299b87
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@@ -99,11 -100,9 +100,11 @@@ public final class StreamResultFuture e
                                                                      UUID planId,
                                                                      String description,
                                                                      InetAddress from,
-                                                                     Socket socket,
+                                                                     IncomingStreamingConnection connection,
                                                                      boolean isForOutgoing,
 -                                                                    int version) throws IOException
 +                                                                    int version,
 +                                                                    boolean keepSSTableLevel,
 +                                                                    boolean isIncremental) throws IOException
      {
          StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
          if (future == null)
@@@ -111,10 -110,10 +112,10 @@@
              logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
  
              // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
 -            future = new StreamResultFuture(planId, description);
 +            future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
              StreamManager.instance.registerReceiving(future);
          }
-         future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+         future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
          logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
          return future;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b448b33/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index e751968,abff812..2b16267
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -56,8 -50,9 +56,9 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  import org.apache.cassandra.io.sstable.SSTableUtils;
 +import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.CounterId;