You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:18 UTC

[11/11] cassandra git commit: switch internode messaging to netty

switch internode messaging to netty

patch by jasobrown, reviewed by pcmanus for CASSANDRA-8457


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/356dc3c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/356dc3c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/356dc3c2

Branch: refs/heads/trunk
Commit: 356dc3c253224751cbf80b32cfce4e3c1640de11
Parents: 3d4a7e7
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Feb 8 07:04:00 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Aug 22 13:54:44 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 conf/cassandra-env.sh                           |   1 +
 lib/licenses/netty-4.1.14.txt                   | 202 ++++++
 lib/licenses/netty-all-4.0.44.Final.txt         | 202 ------
 lib/netty-all-4.0.44.Final.jar                  | Bin 2342652 -> 0 bytes
 lib/netty-all-4.1.14.Final.jar                  | Bin 0 -> 3690637 bytes
 .../org/apache/cassandra/config/Config.java     |   7 +-
 .../cassandra/config/DatabaseDescriptor.java    |  15 +
 .../cassandra/config/EncryptionOptions.java     |   4 +-
 src/java/org/apache/cassandra/db/TypeSizes.java |   6 +
 .../cassandra/locator/PropertyFileSnitch.java   |   2 +-
 .../locator/ReconnectableSnitchHelper.java      |  10 +-
 .../cassandra/metrics/ConnectionMetrics.java    |  27 +-
 .../cassandra/net/IncomingTcpConnection.java    | 197 -----
 .../org/apache/cassandra/net/MessageIn.java     |  35 +-
 .../org/apache/cassandra/net/MessageOut.java    | 128 +++-
 .../apache/cassandra/net/MessagingService.java  | 577 +++++++--------
 .../cassandra/net/OutboundTcpConnection.java    | 693 ------------------
 .../net/OutboundTcpConnectionPool.java          | 229 ------
 .../net/async/ByteBufDataInputPlus.java         |  31 +
 .../net/async/ByteBufDataOutputPlus.java        | 140 ++++
 .../cassandra/net/async/ChannelWriter.java      | 418 +++++++++++
 .../cassandra/net/async/ExpiredException.java   |  28 +
 .../cassandra/net/async/HandshakeProtocol.java  | 304 ++++++++
 .../net/async/InboundHandshakeHandler.java      | 293 ++++++++
 .../cassandra/net/async/MessageInHandler.java   | 314 ++++++++
 .../cassandra/net/async/MessageOutHandler.java  | 324 +++++++++
 .../cassandra/net/async/MessageResult.java      |  51 ++
 .../cassandra/net/async/NettyFactory.java       | 375 ++++++++++
 .../net/async/OutboundConnectionIdentifier.java | 161 +++++
 .../net/async/OutboundConnectionParams.java     | 202 ++++++
 .../net/async/OutboundHandshakeHandler.java     | 255 +++++++
 .../net/async/OutboundMessagingConnection.java  | 716 +++++++++++++++++++
 .../net/async/OutboundMessagingPool.java        | 173 +++++
 .../cassandra/net/async/QueuedMessage.java      |  75 ++
 .../apache/cassandra/security/SSLFactory.java   | 222 +++---
 .../streaming/DefaultConnectionFactory.java     |  31 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   3 +-
 .../apache/cassandra/tracing/TracingImpl.java   |   3 +-
 .../org/apache/cassandra/transport/Message.java |   4 +-
 .../org/apache/cassandra/transport/Server.java  |  25 +-
 .../cassandra/transport/SimpleClient.java       |  18 +-
 .../cassandra/utils/CoalescingStrategies.java   | 406 ++++-------
 .../org/apache/cassandra/utils/FBUtilities.java |   7 +
 .../apache/cassandra/utils/NativeLibrary.java   |   2 +-
 test/conf/cassandra_ssl_test.keystore           | Bin 0 -> 2281 bytes
 test/conf/cassandra_ssl_test.truststore         | Bin 0 -> 992 bytes
 .../apache/cassandra/db/ReadCommandTest.java    |  33 +
 .../apache/cassandra/locator/EC2SnitchTest.java |  20 -
 .../cassandra/net/MessagingServiceTest.java     | 120 +++-
 .../net/OutboundTcpConnectionTest.java          | 175 -----
 .../net/async/ByteBufDataOutputPlusTest.java    | 178 +++++
 .../cassandra/net/async/ChannelWriterTest.java  | 312 ++++++++
 .../net/async/HandshakeHandlersTest.java        | 204 ++++++
 .../net/async/HandshakeProtocolTest.java        |  95 +++
 .../net/async/InboundHandshakeHandlerTest.java  | 289 ++++++++
 .../net/async/MessageInHandlerTest.java         | 242 +++++++
 .../net/async/MessageOutHandlerTest.java        | 289 ++++++++
 .../cassandra/net/async/NettyFactoryTest.java   | 300 ++++++++
 .../NonSendingOutboundMessagingConnection.java  |  42 ++
 .../net/async/OutboundConnectionParamsTest.java |  36 +
 .../net/async/OutboundHandshakeHandlerTest.java | 209 ++++++
 .../async/OutboundMessagingConnectionTest.java  | 519 ++++++++++++++
 .../net/async/OutboundMessagingPoolTest.java    | 149 ++++
 .../cassandra/net/async/TestAuthenticator.java  |  42 ++
 .../RepairMessageSerializationsTest.java        |   2 +
 .../cassandra/security/SSLFactoryTest.java      | 136 +++-
 .../streaming/StreamingTransferTest.java        |  29 +-
 .../utils/CoalescingStrategiesTest.java         | 453 ++----------
 70 files changed, 8024 insertions(+), 2769 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 75a4be9..f2e643e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Use netty for internode messaging (CASSANDRA-8457)
  * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
  * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
  * Fix pending repair manager index out of bounds check (CASSANDRA-13769)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index e033bb6..ee22921 100644
--- a/build.xml
+++ b/build.xml
@@ -420,7 +420,7 @@
           <dependency groupId="com.addthis.metrics" artifactId="reporter-config3" version="3.0.3" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.7" />
-          <dependency groupId="io.netty" artifactId="netty-all" version="4.0.44.Final" />
+          <dependency groupId="io.netty" artifactId="netty-all" version="4.1.13.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded">

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/conf/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/conf/cassandra-env.sh b/conf/cassandra-env.sh
index 5a02f79..347fbf3 100644
--- a/conf/cassandra-env.sh
+++ b/conf/cassandra-env.sh
@@ -293,3 +293,4 @@ JVM_OPTS="$JVM_OPTS -Djava.library.path=$CASSANDRA_HOME/lib/sigar-bin"
 JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS"
 JVM_OPTS="$JVM_OPTS $MX4J_PORT"
 JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/licenses/netty-4.1.14.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/netty-4.1.14.txt b/lib/licenses/netty-4.1.14.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/netty-4.1.14.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/licenses/netty-all-4.0.44.Final.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/netty-all-4.0.44.Final.txt b/lib/licenses/netty-all-4.0.44.Final.txt
deleted file mode 100644
index d645695..0000000
--- a/lib/licenses/netty-all-4.0.44.Final.txt
+++ /dev/null
@@ -1,202 +0,0 @@
-
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright [yyyy] [name of copyright owner]
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/netty-all-4.0.44.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.44.Final.jar b/lib/netty-all-4.0.44.Final.jar
deleted file mode 100644
index 9c5bda5..0000000
Binary files a/lib/netty-all-4.0.44.Final.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/netty-all-4.1.14.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.1.14.Final.jar b/lib/netty-all-4.1.14.Final.jar
new file mode 100644
index 0000000..e5c8137
Binary files /dev/null and b/lib/netty-all-4.1.14.Final.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5a45282..77d5bf4 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -276,6 +276,7 @@ public class Config
     public int tracetype_repair_ttl = (int) TimeUnit.DAYS.toSeconds(7);
 
     /**
+<<<<<<< HEAD
      * Maintain statistics on whether writes achieve the ideal consistency level
      * before expiring and becoming hints
      */
@@ -283,8 +284,12 @@ public class Config
 
     /*
      * Strategy to use for coalescing messages in OutboundTcpConnection.
+=======
+     * Strategy to use for coalescing messages in {@link OutboundMessagingPool}.
+>>>>>>> 2acc8dbc74... switch internode messaging to netty
      * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing
-     * whitespace insensitive. You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+     * whitespace insensitive. You can also specify a subclass of
+     * {@link org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy} by name.
      */
     public String otc_coalescing_strategy = "DISABLED";
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fb50826..53bac93 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1567,6 +1567,11 @@ public class DatabaseDescriptor
         return conf.listen_on_broadcast_address;
     }
 
+    public static void setListenOnBroadcastAddress(boolean listen_on_broadcast_address)
+    {
+        conf.listen_on_broadcast_address = listen_on_broadcast_address;
+    }
+
     public static IInternodeAuthenticator getInternodeAuthenticator()
     {
         return internodeAuthenticator;
@@ -2095,6 +2100,11 @@ public class DatabaseDescriptor
         return conf.internode_compression;
     }
 
+    public static void setInternodeCompression(Config.InternodeCompression compression)
+    {
+        conf.internode_compression = compression;
+    }
+
     public static boolean getInterDCTcpNoDelay()
     {
         return conf.inter_dc_tcp_nodelay;
@@ -2156,6 +2166,11 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_strategy;
     }
 
+    public static void setOtcCoalescingStrategy(String strategy)
+    {
+        conf.otc_coalescing_strategy = strategy;
+    }
+
     public static int getOtcCoalescingWindow()
     {
         return conf.otc_coalescing_window_us;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/EncryptionOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index d662871..6010746 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -17,15 +17,13 @@
  */
 package org.apache.cassandra.config;
 
-import javax.net.ssl.SSLSocketFactory;
-
 public abstract class EncryptionOptions
 {
     public String keystore = "conf/.keystore";
     public String keystore_password = "cassandra";
     public String truststore = "conf/.truststore";
     public String truststore_password = "cassandra";
-    public String[] cipher_suites = ((SSLSocketFactory)SSLSocketFactory.getDefault()).getDefaultCipherSuites();
+    public String[] cipher_suites = {};
     public String protocol = "TLS";
     public String algorithm = "SunX509";
     public String store_type = "JKS";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index 6543025..b47e300 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -28,6 +28,7 @@ public final class TypeSizes
     private TypeSizes(){}
 
     private static final int BOOL_SIZE = 1;
+    private static final int BYTE_SIZE = 1;
     private static final int SHORT_SIZE = 2;
     private static final int INT_SIZE = 4;
     private static final int LONG_SIZE = 8;
@@ -78,6 +79,11 @@ public final class TypeSizes
         return BOOL_SIZE;
     }
 
+    public static int sizeof(byte value)
+    {
+        return BYTE_SIZE;
+    }
+
     public static int sizeof(short value)
     {
         return SHORT_SIZE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 8cc6549..2908976 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -192,7 +192,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
             throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " +
                                                            "this node's broadcast address %s, nor does it provides a default",
                                                            SNITCH_PROPERTIES_FILENAME, broadcastAddress));
-        // OutboundTcpConnectionPool.getEndpoint() converts our broadcast address to local,
+        // internode messaging code converts our broadcast address to local,
         // make sure we can be found at that as well.
         InetAddress localAddress = FBUtilities.getLocalAddress();
         if (!localAddress.equals(broadcastAddress) && !reloadedMap.containsKey(localAddress))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 08f0a14..2235c76 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -23,9 +23,9 @@ import java.net.UnknownHostException;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,18 +64,16 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
     @VisibleForTesting
     static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
     {
-        OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress);
-        //InternodeAuthenticator said don't connect
-        if (cp == null)
+        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.portFor(publicAddress)))
         {
             logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
             return;
         }
 
         if (snitch.getDatacenter(publicAddress).equals(localDc)
-                && !cp.endPoint().equals(localAddress))
+                && !MessagingService.instance().getCurrentEndpoint(publicAddress).equals(localAddress))
         {
-            cp.reset(localAddress);
+            MessagingService.instance().reconnectWithNewIp(publicAddress, localAddress);
             logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress, publicAddress);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index f01c06d..7815784 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -21,14 +21,12 @@ import java.net.InetAddress;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
+import org.apache.cassandra.net.async.OutboundMessagingPool;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
-
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
-
 /**
- * Metrics for {@link OutboundTcpConnectionPool}.
+ * Metrics for internode connections.
  */
 public class ConnectionMetrics
 {
@@ -66,9 +64,8 @@ public class ConnectionMetrics
      * Create metrics for given connection pool.
      *
      * @param ip IP address to use for metrics label
-     * @param connectionPool Connection pool
      */
-    public ConnectionMetrics(InetAddress ip, final OutboundTcpConnectionPool connectionPool)
+    public ConnectionMetrics(InetAddress ip, final OutboundMessagingPool messagingPool)
     {
         // ipv6 addresses will contain colons, which are invalid in a JMX ObjectName
         address = ip.getHostAddress().replace(':', '.');
@@ -79,63 +76,63 @@ public class ConnectionMetrics
         {
             public Integer getValue()
             {
-                return connectionPool.largeMessages.getPendingMessages();
+                return messagingPool.largeMessageChannel.getPendingMessages();
             }
         });
         largeMessageCompletedTasks = Metrics.register(factory.createMetricName("LargeMessageCompletedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.largeMessages.getCompletedMesssages();
+                return messagingPool.largeMessageChannel.getCompletedMessages();
             }
         });
         largeMessageDroppedTasks = Metrics.register(factory.createMetricName("LargeMessageDroppedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.largeMessages.getDroppedMessages();
+                return messagingPool.largeMessageChannel.getDroppedMessages();
             }
         });
         smallMessagePendingTasks = Metrics.register(factory.createMetricName("SmallMessagePendingTasks"), new Gauge<Integer>()
         {
             public Integer getValue()
             {
-                return connectionPool.smallMessages.getPendingMessages();
+                return messagingPool.smallMessageChannel.getPendingMessages();
             }
         });
         smallMessageCompletedTasks = Metrics.register(factory.createMetricName("SmallMessageCompletedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.smallMessages.getCompletedMesssages();
+                return messagingPool.smallMessageChannel.getCompletedMessages();
             }
         });
         smallMessageDroppedTasks = Metrics.register(factory.createMetricName("SmallMessageDroppedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.smallMessages.getDroppedMessages();
+                return messagingPool.smallMessageChannel.getDroppedMessages();
             }
         });
         gossipMessagePendingTasks = Metrics.register(factory.createMetricName("GossipMessagePendingTasks"), new Gauge<Integer>()
         {
             public Integer getValue()
             {
-                return connectionPool.gossipMessages.getPendingMessages();
+                return messagingPool.gossipChannel.getPendingMessages();
             }
         });
         gossipMessageCompletedTasks = Metrics.register(factory.createMetricName("GossipMessageCompletedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.gossipMessages.getCompletedMesssages();
+                return messagingPool.gossipChannel.getCompletedMessages();
             }
         });
         gossipMessageDroppedTasks = Metrics.register(factory.createMetricName("GossipMessageDroppedTasks"), new Gauge<Long>()
         {
             public Long getValue()
             {
-                return connectionPool.gossipMessages.getDroppedMessages();
+                return messagingPool.gossipChannel.getDroppedMessages();
             }
         });
         timeouts = Metrics.meter(factory.createMetricName("Timeouts"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
deleted file mode 100644
index 67e54c8..0000000
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.zip.Checksum;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.util.concurrent.FastThreadLocalThread;
-import net.jpountz.lz4.LZ4BlockInputStream;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.exceptions.UnknownTableException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.monitoring.ApproximateTime;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
-
-public class IncomingTcpConnection extends FastThreadLocalThread implements Closeable
-{
-    private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
-
-    private static final int BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + ".itc_buffer_size", 1024 * 4);
-
-    private final int version;
-    private final boolean compressed;
-    private final Socket socket;
-    private final Set<Closeable> group;
-    public InetAddress from;
-
-    public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set<Closeable> group)
-    {
-        super("MessagingService-Incoming-" + socket.getInetAddress());
-        this.version = version;
-        this.compressed = compressed;
-        this.socket = socket;
-        this.group = group;
-        if (DatabaseDescriptor.getInternodeRecvBufferSize() > 0)
-        {
-            try
-            {
-                this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize());
-            }
-            catch (SocketException se)
-            {
-                logger.warn("Failed to set receive buffer size on internode socket.", se);
-            }
-        }
-    }
-
-    /**
-     * A new connection will either stream or message for its entire lifetime: because streaming
-     * bypasses the InputStream implementations to use sendFile, we cannot begin buffering until
-     * we've determined the type of the connection.
-     */
-    @Override
-    public void run()
-    {
-        try
-        {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; "
-                                                                      + "The earliest version supported is 3.0.0",
-                                                                      version));
-
-            receiveMessages();
-        }
-        catch (EOFException e)
-        {
-            logger.trace("eof reading from socket; closing", e);
-            // connection will be reset so no need to throw an exception.
-        }
-        catch (UnknownTableException e)
-        {
-            logger.warn("UnknownTableException reading from socket; closing", e);
-        }
-        catch (IOException e)
-        {
-            logger.trace("IOException reading from socket; closing", e);
-        }
-        finally
-        {
-            close();
-        }
-    }
-
-    @Override
-    public void close()
-    {
-        try
-        {
-            if (logger.isTraceEnabled())
-                logger.trace("Closing socket {} - isclosed: {}", socket, socket.isClosed());
-            if (!socket.isClosed())
-            {
-                socket.close();
-            }
-        }
-        catch (IOException e)
-        {
-            logger.trace("Error closing socket", e);
-        }
-        finally
-        {
-            group.remove(this);
-        }
-    }
-
-    @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the stream needs to remain open.
-    private void receiveMessages() throws IOException
-    {
-        // handshake (true) endpoint versions
-        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
-        // if this version is < the MS version the other node is trying
-        // to connect with, the other node will disconnect
-        out.writeInt(MessagingService.current_version);
-        out.flush();
-        DataInputPlus in = new DataInputStreamPlus(socket.getInputStream());
-        int maxVersion = in.readInt();
-        // outbound side will reconnect if necessary to upgrade version
-        assert version <= MessagingService.current_version;
-        from = CompactEndpointSerializationHelper.deserialize(in);
-        // record the (true) version of the endpoint
-        MessagingService.instance().setVersion(from, maxVersion);
-        logger.trace("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from));
-
-        if (compressed)
-        {
-            logger.trace("Upgrading incoming connection to be compressed");
-            LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
-            Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum();
-            in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(),
-                                                             decompressor,
-                                                             checksum));
-        }
-        else
-        {
-            ReadableByteChannel channel = socket.getChannel();
-            in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()), BUFFER_SIZE);
-        }
-
-        while (true)
-        {
-            MessagingService.validateMagic(in.readInt());
-            receiveMessage(in, version);
-        }
-    }
-
-    private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException
-    {
-        int id = input.readInt();
-
-        long currentTime = ApproximateTime.currentTimeMillis();
-        MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime));
-        if (message == null)
-        {
-            // callback expired; nothing to do
-            return null;
-        }
-        if (version <= MessagingService.current_version)
-        {
-            MessagingService.instance().receive(message, id);
-        }
-        else
-        {
-            logger.trace("Received connection from newer protocol version {}. Ignoring message", version);
-        }
-        return message.from;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 8774d38..d520fa9 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -31,20 +31,27 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService.Verb;
 
+/**
+ * The receiving node's view of a {@link MessageOut}. See documentation on {@link MessageOut} for details on the
+ * serialization format.
+ *
+ * @param <T> The type of the payload
+ */
 public class MessageIn<T>
 {
     public final InetAddress from;
     public final T payload;
     public final Map<String, byte[]> parameters;
-    public final MessagingService.Verb verb;
+    public final Verb verb;
     public final int version;
     public final long constructionTime;
 
     private MessageIn(InetAddress from,
                       T payload,
                       Map<String, byte[]> parameters,
-                      MessagingService.Verb verb,
+                      Verb verb,
                       int version,
                       long constructionTime)
     {
@@ -59,7 +66,7 @@ public class MessageIn<T>
     public static <T> MessageIn<T> create(InetAddress from,
                                           T payload,
                                           Map<String, byte[]> parameters,
-                                          MessagingService.Verb verb,
+                                          Verb verb,
                                           int version,
                                           long constructionTime)
     {
@@ -85,11 +92,17 @@ public class MessageIn<T>
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 
         MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+        Map<String, byte[]> parameters = readParameters(in);
+        int payloadSize = in.readInt();
+        return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
+    }
+
+    public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
+    {
         int parameterCount = in.readInt();
-        Map<String, byte[]> parameters;
         if (parameterCount == 0)
         {
-            parameters = Collections.emptyMap();
+            return Collections.emptyMap();
         }
         else
         {
@@ -101,10 +114,13 @@ public class MessageIn<T>
                 in.readFully(value);
                 builder.put(key, value);
             }
-            parameters = builder.build();
+            return builder.build();
         }
+    }
 
-        int payloadSize = in.readInt();
+    public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime,
+                                          InetAddress from, int payloadSize, Verb verb, Map<String, byte[]> parameters) throws IOException
+    {
         IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb);
         if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
         {
@@ -124,12 +140,11 @@ public class MessageIn<T>
         return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
     }
 
-    public static long readConstructionTime(InetAddress from, DataInputPlus input, long currentTime) throws IOException
+    public static long deriveConstructionTime(InetAddress from, int messageTimestamp, long currentTime)
     {
         // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
         // higher 4 bytes wouldn't change between the sender and receiver)
-        int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled
-        long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+        long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((messageTimestamp & 0xFFFFFFFFL) << 2) >> 2);
 
         // Because nodes may not have their clock perfectly in sync, it's actually possible the sentConstructionTime is
         // later than the currentTime (the received time). If that's the case, as we definitively know there is a lack

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index a38aed5..379aff5 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -29,21 +29,78 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
 import static org.apache.cassandra.tracing.Tracing.isTracing;
 
+/**
+ * Each message contains a header with several fixed fields, an optional key-value parameters section, and then
+ * the message payload itself. Note: the IP address in the header may be either IPv4 (4 bytes) or IPv6 (16 bytes).
+ * The diagram below shows the IPv4 address for brevity.
+ *
+ * <pre>
+ * {@code
+ *            1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 6 6
+ *  0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                       PROTOCOL MAGIC                          |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Message ID                            |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Timestamp                             |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |  Addr len |           IP Address (IPv4)                       /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /           |                 Verb                              /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /           |            Parameters size                        /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /           |             Parameter data                        /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /                                                               |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                        Payload size                           |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                                               /
+ * /                           Payload                             /
+ * /                                                               |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }
+ * </pre>
+ *
+ * An individual parameter has a String key and a byte array value. The key is serialized with it's length,
+ * encoded as two bytes, followed by the UTF-8 byte encoding of the string (see {@link java.io.DataOutput#writeUTF(java.lang.String)}).
+ * The body is serialized with it's length, encoded as four bytes, followed by the bytes of the value.
+ *
+ * * @param <T> The type of the message payload.
+ */
 public class MessageOut<T>
 {
+    private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
+
     public final InetAddress from;
     public final MessagingService.Verb verb;
     public final T payload;
     public final IVersionedSerializer<T> serializer;
     public final Map<String, byte[]> parameters;
-    private long payloadSize = -1;
-    private int payloadSizeVersion = -1;
+
+    /**
+     * Memoization of the serialized size of the just the payload.
+     */
+    private int payloadSerializedSize = -1;
+
+    /**
+     * Memoization of the serialized size of the entire message.
+     */
+    private int serializedSize = -1;
+
+    /**
+     * The internode protocol messaging version that was used to calculate the memoized serailized sizes.
+     */
+    private int serializedSizeVersion = SERIALIZED_SIZE_VERSION_UNDEFINED;
 
     // we do support messages that just consist of a verb
     public MessageOut(MessagingService.Verb verb)
@@ -115,14 +172,12 @@ public class MessageOut<T>
 
         if (payload != null)
         {
-            try(DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
-            {
-                serializer.serialize(payload, dob, version);
-
-                int size = dob.getLength();
-                out.writeInt(size);
-                out.write(dob.getData(), 0, size);
-            }
+            int payloadSize = payloadSerializedSize >= 0
+                              ? (int)payloadSerializedSize
+                              : (int) serializer.serializedSize(payload, version);
+
+            out.writeInt(payloadSize);
+            serializer.serialize(payload, out, version);
         }
         else
         {
@@ -130,9 +185,9 @@ public class MessageOut<T>
         }
     }
 
-    public int serializedSize(int version)
+    private Pair<Long, Long> calculateSerializedSize(int version)
     {
-        int size = CompactEndpointSerializationHelper.serializedSize(from);
+        long size = CompactEndpointSerializationHelper.serializedSize(from);
 
         size += TypeSizes.sizeof(verb.getId());
         size += TypeSizes.sizeof(parameters.size());
@@ -143,38 +198,43 @@ public class MessageOut<T>
             size += entry.getValue().length;
         }
 
-        long longSize = payloadSize(version);
-        assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
-        size += TypeSizes.sizeof((int) longSize);
-        size += longSize;
-        return size;
+        long payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version);
+        assert payloadSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
+        size += TypeSizes.sizeof((int) payloadSize);
+        size += payloadSize;
+        return Pair.create(size, payloadSize);
     }
 
     /**
-     * Calculate the size of the payload of this message for the specified protocol version
-     * and memoize the result for the specified protocol version. Memoization only covers the protocol
-     * version of the first invocation.
+     * Calculate the size of this message for the specified protocol version and memoize the result for the specified
+     * protocol version. Memoization only covers the protocol version of the first invocation.
      *
-     * It is not safe to call payloadSize concurrently from multiple threads unless it has already been invoked
+     * It is not safe to call this function concurrently from multiple threads unless it has already been invoked
      * once from a single thread and there is a happens before relationship between that invocation and other
-     * threads concurrently invoking payloadSize.
+     * threads concurrently invoking this function.
      *
      * For instance it would be safe to invokePayload size to make a decision in the thread that created the message
      * and then hand it off to other threads via a thread-safe queue, volatile write, or synchronized/ReentrantLock.
-     * @param version Protocol version to use when calculating payload size
-     * @return Size of the payload of this message in bytes
+     *
+     * @param version Protocol version to use when calculating size
+     * @return Size of this message in bytes, which will be less than or equal to {@link Integer#MAX_VALUE}
      */
-    public long payloadSize(int version)
+    public int serializedSize(int version)
     {
-        if (payloadSize == -1)
-        {
-            payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version);
-            payloadSizeVersion = version;
-        }
-        else if (payloadSizeVersion != version)
+        if (serializedSize > 0 && serializedSizeVersion == version)
+            return serializedSize;
+
+        Pair<Long, Long> sizes = calculateSerializedSize(version);
+        if (sizes.left > Integer.MAX_VALUE)
+            throw new IllegalStateException("message size exceeds maximum allowed size: size = " + sizes.left);
+
+        if (serializedSizeVersion == SERIALIZED_SIZE_VERSION_UNDEFINED)
         {
-            return payload == null ? 0 : serializer.serializedSize(payload, version);
+            serializedSize = sizes.left.intValue();
+            payloadSerializedSize = sizes.right.intValue();
+            serializedSizeVersion = version;
         }
-        return payloadSize;
+
+        return sizes.left.intValue();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org