You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:18 UTC
[11/11] cassandra git commit: switch internode messaging to netty
switch internode messaging to netty
patch by jasobrown, reviewed by pcmanus for CASSANDRA-8457
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/356dc3c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/356dc3c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/356dc3c2
Branch: refs/heads/trunk
Commit: 356dc3c253224751cbf80b32cfce4e3c1640de11
Parents: 3d4a7e7
Author: Jason Brown <ja...@gmail.com>
Authored: Mon Feb 8 07:04:00 2016 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Aug 22 13:54:44 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +-
conf/cassandra-env.sh | 1 +
lib/licenses/netty-4.1.14.txt | 202 ++++++
lib/licenses/netty-all-4.0.44.Final.txt | 202 ------
lib/netty-all-4.0.44.Final.jar | Bin 2342652 -> 0 bytes
lib/netty-all-4.1.14.Final.jar | Bin 0 -> 3690637 bytes
.../org/apache/cassandra/config/Config.java | 7 +-
.../cassandra/config/DatabaseDescriptor.java | 15 +
.../cassandra/config/EncryptionOptions.java | 4 +-
src/java/org/apache/cassandra/db/TypeSizes.java | 6 +
.../cassandra/locator/PropertyFileSnitch.java | 2 +-
.../locator/ReconnectableSnitchHelper.java | 10 +-
.../cassandra/metrics/ConnectionMetrics.java | 27 +-
.../cassandra/net/IncomingTcpConnection.java | 197 -----
.../org/apache/cassandra/net/MessageIn.java | 35 +-
.../org/apache/cassandra/net/MessageOut.java | 128 +++-
.../apache/cassandra/net/MessagingService.java | 577 +++++++--------
.../cassandra/net/OutboundTcpConnection.java | 693 ------------------
.../net/OutboundTcpConnectionPool.java | 229 ------
.../net/async/ByteBufDataInputPlus.java | 31 +
.../net/async/ByteBufDataOutputPlus.java | 140 ++++
.../cassandra/net/async/ChannelWriter.java | 418 +++++++++++
.../cassandra/net/async/ExpiredException.java | 28 +
.../cassandra/net/async/HandshakeProtocol.java | 304 ++++++++
.../net/async/InboundHandshakeHandler.java | 293 ++++++++
.../cassandra/net/async/MessageInHandler.java | 314 ++++++++
.../cassandra/net/async/MessageOutHandler.java | 324 +++++++++
.../cassandra/net/async/MessageResult.java | 51 ++
.../cassandra/net/async/NettyFactory.java | 375 ++++++++++
.../net/async/OutboundConnectionIdentifier.java | 161 +++++
.../net/async/OutboundConnectionParams.java | 202 ++++++
.../net/async/OutboundHandshakeHandler.java | 255 +++++++
.../net/async/OutboundMessagingConnection.java | 716 +++++++++++++++++++
.../net/async/OutboundMessagingPool.java | 173 +++++
.../cassandra/net/async/QueuedMessage.java | 75 ++
.../apache/cassandra/security/SSLFactory.java | 222 +++---
.../streaming/DefaultConnectionFactory.java | 31 +-
.../org/apache/cassandra/tracing/Tracing.java | 3 +-
.../apache/cassandra/tracing/TracingImpl.java | 3 +-
.../org/apache/cassandra/transport/Message.java | 4 +-
.../org/apache/cassandra/transport/Server.java | 25 +-
.../cassandra/transport/SimpleClient.java | 18 +-
.../cassandra/utils/CoalescingStrategies.java | 406 ++++-------
.../org/apache/cassandra/utils/FBUtilities.java | 7 +
.../apache/cassandra/utils/NativeLibrary.java | 2 +-
test/conf/cassandra_ssl_test.keystore | Bin 0 -> 2281 bytes
test/conf/cassandra_ssl_test.truststore | Bin 0 -> 992 bytes
.../apache/cassandra/db/ReadCommandTest.java | 33 +
.../apache/cassandra/locator/EC2SnitchTest.java | 20 -
.../cassandra/net/MessagingServiceTest.java | 120 +++-
.../net/OutboundTcpConnectionTest.java | 175 -----
.../net/async/ByteBufDataOutputPlusTest.java | 178 +++++
.../cassandra/net/async/ChannelWriterTest.java | 312 ++++++++
.../net/async/HandshakeHandlersTest.java | 204 ++++++
.../net/async/HandshakeProtocolTest.java | 95 +++
.../net/async/InboundHandshakeHandlerTest.java | 289 ++++++++
.../net/async/MessageInHandlerTest.java | 242 +++++++
.../net/async/MessageOutHandlerTest.java | 289 ++++++++
.../cassandra/net/async/NettyFactoryTest.java | 300 ++++++++
.../NonSendingOutboundMessagingConnection.java | 42 ++
.../net/async/OutboundConnectionParamsTest.java | 36 +
.../net/async/OutboundHandshakeHandlerTest.java | 209 ++++++
.../async/OutboundMessagingConnectionTest.java | 519 ++++++++++++++
.../net/async/OutboundMessagingPoolTest.java | 149 ++++
.../cassandra/net/async/TestAuthenticator.java | 42 ++
.../RepairMessageSerializationsTest.java | 2 +
.../cassandra/security/SSLFactoryTest.java | 136 +++-
.../streaming/StreamingTransferTest.java | 29 +-
.../utils/CoalescingStrategiesTest.java | 453 ++----------
70 files changed, 8024 insertions(+), 2769 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 75a4be9..f2e643e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Use netty for internode messaging (CASSANDRA-8457)
* Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
* Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
* Fix pending repair manager index out of bounds check (CASSANDRA-13769)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index e033bb6..ee22921 100644
--- a/build.xml
+++ b/build.xml
@@ -420,7 +420,7 @@
<dependency groupId="com.addthis.metrics" artifactId="reporter-config3" version="3.0.3" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.airlift" artifactId="airline" version="0.7" />
- <dependency groupId="io.netty" artifactId="netty-all" version="4.0.44.Final" />
+ <dependency groupId="io.netty" artifactId="netty-all" version="4.1.13.Final" />
<dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
<dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="3.0.1" classifier="shaded">
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/conf/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/conf/cassandra-env.sh b/conf/cassandra-env.sh
index 5a02f79..347fbf3 100644
--- a/conf/cassandra-env.sh
+++ b/conf/cassandra-env.sh
@@ -293,3 +293,4 @@ JVM_OPTS="$JVM_OPTS -Djava.library.path=$CASSANDRA_HOME/lib/sigar-bin"
JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS"
JVM_OPTS="$JVM_OPTS $MX4J_PORT"
JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/licenses/netty-4.1.14.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/netty-4.1.14.txt b/lib/licenses/netty-4.1.14.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/netty-4.1.14.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/licenses/netty-all-4.0.44.Final.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/netty-all-4.0.44.Final.txt b/lib/licenses/netty-all-4.0.44.Final.txt
deleted file mode 100644
index d645695..0000000
--- a/lib/licenses/netty-all-4.0.44.Final.txt
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/netty-all-4.0.44.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.0.44.Final.jar b/lib/netty-all-4.0.44.Final.jar
deleted file mode 100644
index 9c5bda5..0000000
Binary files a/lib/netty-all-4.0.44.Final.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/lib/netty-all-4.1.14.Final.jar
----------------------------------------------------------------------
diff --git a/lib/netty-all-4.1.14.Final.jar b/lib/netty-all-4.1.14.Final.jar
new file mode 100644
index 0000000..e5c8137
Binary files /dev/null and b/lib/netty-all-4.1.14.Final.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5a45282..77d5bf4 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -276,6 +276,7 @@ public class Config
public int tracetype_repair_ttl = (int) TimeUnit.DAYS.toSeconds(7);
/**
+<<<<<<< HEAD
* Maintain statistics on whether writes achieve the ideal consistency level
* before expiring and becoming hints
*/
@@ -283,8 +284,12 @@ public class Config
/*
* Strategy to use for coalescing messages in OutboundTcpConnection.
+=======
+ * Strategy to use for coalescing messages in {@link OutboundMessagingPool}.
+>>>>>>> 2acc8dbc74... switch internode messaging to netty
* Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing
- * whitespace insensitive. You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ * whitespace insensitive. You can also specify a subclass of
+ * {@link org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy} by name.
*/
public String otc_coalescing_strategy = "DISABLED";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fb50826..53bac93 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1567,6 +1567,11 @@ public class DatabaseDescriptor
return conf.listen_on_broadcast_address;
}
+ public static void setListenOnBroadcastAddress(boolean listen_on_broadcast_address)
+ {
+ conf.listen_on_broadcast_address = listen_on_broadcast_address;
+ }
+
public static IInternodeAuthenticator getInternodeAuthenticator()
{
return internodeAuthenticator;
@@ -2095,6 +2100,11 @@ public class DatabaseDescriptor
return conf.internode_compression;
}
+ public static void setInternodeCompression(Config.InternodeCompression compression)
+ {
+ conf.internode_compression = compression;
+ }
+
public static boolean getInterDCTcpNoDelay()
{
return conf.inter_dc_tcp_nodelay;
@@ -2156,6 +2166,11 @@ public class DatabaseDescriptor
return conf.otc_coalescing_strategy;
}
+ public static void setOtcCoalescingStrategy(String strategy)
+ {
+ conf.otc_coalescing_strategy = strategy;
+ }
+
public static int getOtcCoalescingWindow()
{
return conf.otc_coalescing_window_us;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/config/EncryptionOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index d662871..6010746 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -17,15 +17,13 @@
*/
package org.apache.cassandra.config;
-import javax.net.ssl.SSLSocketFactory;
-
public abstract class EncryptionOptions
{
public String keystore = "conf/.keystore";
public String keystore_password = "cassandra";
public String truststore = "conf/.truststore";
public String truststore_password = "cassandra";
- public String[] cipher_suites = ((SSLSocketFactory)SSLSocketFactory.getDefault()).getDefaultCipherSuites();
+ public String[] cipher_suites = {};
public String protocol = "TLS";
public String algorithm = "SunX509";
public String store_type = "JKS";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index 6543025..b47e300 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -28,6 +28,7 @@ public final class TypeSizes
private TypeSizes(){}
private static final int BOOL_SIZE = 1;
+ private static final int BYTE_SIZE = 1;
private static final int SHORT_SIZE = 2;
private static final int INT_SIZE = 4;
private static final int LONG_SIZE = 8;
@@ -78,6 +79,11 @@ public final class TypeSizes
return BOOL_SIZE;
}
+ public static int sizeof(byte value)
+ {
+ return BYTE_SIZE;
+ }
+
public static int sizeof(short value)
{
return SHORT_SIZE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 8cc6549..2908976 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -192,7 +192,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " +
"this node's broadcast address %s, nor does it provides a default",
SNITCH_PROPERTIES_FILENAME, broadcastAddress));
- // OutboundTcpConnectionPool.getEndpoint() converts our broadcast address to local,
+ // internode messaging code converts our broadcast address to local,
// make sure we can be found at that as well.
InetAddress localAddress = FBUtilities.getLocalAddress();
if (!localAddress.equals(broadcastAddress) && !reloadedMap.containsKey(localAddress))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 08f0a14..2235c76 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -23,9 +23,9 @@ import java.net.UnknownHostException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,18 +64,16 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
@VisibleForTesting
static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
{
- OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress);
- //InternodeAuthenticator said don't connect
- if (cp == null)
+ if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.portFor(publicAddress)))
{
logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
return;
}
if (snitch.getDatacenter(publicAddress).equals(localDc)
- && !cp.endPoint().equals(localAddress))
+ && !MessagingService.instance().getCurrentEndpoint(publicAddress).equals(localAddress))
{
- cp.reset(localAddress);
+ MessagingService.instance().reconnectWithNewIp(publicAddress, localAddress);
logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress, publicAddress);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index f01c06d..7815784 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -21,14 +21,12 @@ import java.net.InetAddress;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
+import org.apache.cassandra.net.async.OutboundMessagingPool;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
-
/**
- * Metrics for {@link OutboundTcpConnectionPool}.
+ * Metrics for internode connections.
*/
public class ConnectionMetrics
{
@@ -66,9 +64,8 @@ public class ConnectionMetrics
* Create metrics for given connection pool.
*
* @param ip IP address to use for metrics label
- * @param connectionPool Connection pool
*/
- public ConnectionMetrics(InetAddress ip, final OutboundTcpConnectionPool connectionPool)
+ public ConnectionMetrics(InetAddress ip, final OutboundMessagingPool messagingPool)
{
// ipv6 addresses will contain colons, which are invalid in a JMX ObjectName
address = ip.getHostAddress().replace(':', '.');
@@ -79,63 +76,63 @@ public class ConnectionMetrics
{
public Integer getValue()
{
- return connectionPool.largeMessages.getPendingMessages();
+ return messagingPool.largeMessageChannel.getPendingMessages();
}
});
largeMessageCompletedTasks = Metrics.register(factory.createMetricName("LargeMessageCompletedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.largeMessages.getCompletedMesssages();
+ return messagingPool.largeMessageChannel.getCompletedMessages();
}
});
largeMessageDroppedTasks = Metrics.register(factory.createMetricName("LargeMessageDroppedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.largeMessages.getDroppedMessages();
+ return messagingPool.largeMessageChannel.getDroppedMessages();
}
});
smallMessagePendingTasks = Metrics.register(factory.createMetricName("SmallMessagePendingTasks"), new Gauge<Integer>()
{
public Integer getValue()
{
- return connectionPool.smallMessages.getPendingMessages();
+ return messagingPool.smallMessageChannel.getPendingMessages();
}
});
smallMessageCompletedTasks = Metrics.register(factory.createMetricName("SmallMessageCompletedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.smallMessages.getCompletedMesssages();
+ return messagingPool.smallMessageChannel.getCompletedMessages();
}
});
smallMessageDroppedTasks = Metrics.register(factory.createMetricName("SmallMessageDroppedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.smallMessages.getDroppedMessages();
+ return messagingPool.smallMessageChannel.getDroppedMessages();
}
});
gossipMessagePendingTasks = Metrics.register(factory.createMetricName("GossipMessagePendingTasks"), new Gauge<Integer>()
{
public Integer getValue()
{
- return connectionPool.gossipMessages.getPendingMessages();
+ return messagingPool.gossipChannel.getPendingMessages();
}
});
gossipMessageCompletedTasks = Metrics.register(factory.createMetricName("GossipMessageCompletedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.gossipMessages.getCompletedMesssages();
+ return messagingPool.gossipChannel.getCompletedMessages();
}
});
gossipMessageDroppedTasks = Metrics.register(factory.createMetricName("GossipMessageDroppedTasks"), new Gauge<Long>()
{
public Long getValue()
{
- return connectionPool.gossipMessages.getDroppedMessages();
+ return messagingPool.gossipChannel.getDroppedMessages();
}
});
timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
deleted file mode 100644
index 67e54c8..0000000
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.zip.Checksum;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.util.concurrent.FastThreadLocalThread;
-import net.jpountz.lz4.LZ4BlockInputStream;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.exceptions.UnknownTableException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.monitoring.ApproximateTime;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.NIODataInputStream;
-
-public class IncomingTcpConnection extends FastThreadLocalThread implements Closeable
-{
- private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
-
- private static final int BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + ".itc_buffer_size", 1024 * 4);
-
- private final int version;
- private final boolean compressed;
- private final Socket socket;
- private final Set<Closeable> group;
- public InetAddress from;
-
- public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set<Closeable> group)
- {
- super("MessagingService-Incoming-" + socket.getInetAddress());
- this.version = version;
- this.compressed = compressed;
- this.socket = socket;
- this.group = group;
- if (DatabaseDescriptor.getInternodeRecvBufferSize() > 0)
- {
- try
- {
- this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize());
- }
- catch (SocketException se)
- {
- logger.warn("Failed to set receive buffer size on internode socket.", se);
- }
- }
- }
-
- /**
- * A new connection will either stream or message for its entire lifetime: because streaming
- * bypasses the InputStream implementations to use sendFile, we cannot begin buffering until
- * we've determined the type of the connection.
- */
- @Override
- public void run()
- {
- try
- {
- if (version < MessagingService.VERSION_30)
- throw new UnsupportedOperationException(String.format("Unable to read obsolete message version %s; "
- + "The earliest version supported is 3.0.0",
- version));
-
- receiveMessages();
- }
- catch (EOFException e)
- {
- logger.trace("eof reading from socket; closing", e);
- // connection will be reset so no need to throw an exception.
- }
- catch (UnknownTableException e)
- {
- logger.warn("UnknownTableException reading from socket; closing", e);
- }
- catch (IOException e)
- {
- logger.trace("IOException reading from socket; closing", e);
- }
- finally
- {
- close();
- }
- }
-
- @Override
- public void close()
- {
- try
- {
- if (logger.isTraceEnabled())
- logger.trace("Closing socket {} - isclosed: {}", socket, socket.isClosed());
- if (!socket.isClosed())
- {
- socket.close();
- }
- }
- catch (IOException e)
- {
- logger.trace("Error closing socket", e);
- }
- finally
- {
- group.remove(this);
- }
- }
-
- @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the stream needs to remain open.
- private void receiveMessages() throws IOException
- {
- // handshake (true) endpoint versions
- DataOutputStream out = new DataOutputStream(socket.getOutputStream());
- // if this version is < the MS version the other node is trying
- // to connect with, the other node will disconnect
- out.writeInt(MessagingService.current_version);
- out.flush();
- DataInputPlus in = new DataInputStreamPlus(socket.getInputStream());
- int maxVersion = in.readInt();
- // outbound side will reconnect if necessary to upgrade version
- assert version <= MessagingService.current_version;
- from = CompactEndpointSerializationHelper.deserialize(in);
- // record the (true) version of the endpoint
- MessagingService.instance().setVersion(from, maxVersion);
- logger.trace("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from));
-
- if (compressed)
- {
- logger.trace("Upgrading incoming connection to be compressed");
- LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor();
- Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(OutboundTcpConnection.LZ4_HASH_SEED).asChecksum();
- in = new DataInputStreamPlus(new LZ4BlockInputStream(socket.getInputStream(),
- decompressor,
- checksum));
- }
- else
- {
- ReadableByteChannel channel = socket.getChannel();
- in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()), BUFFER_SIZE);
- }
-
- while (true)
- {
- MessagingService.validateMagic(in.readInt());
- receiveMessage(in, version);
- }
- }
-
- private InetAddress receiveMessage(DataInputPlus input, int version) throws IOException
- {
- int id = input.readInt();
-
- long currentTime = ApproximateTime.currentTimeMillis();
- MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input, currentTime));
- if (message == null)
- {
- // callback expired; nothing to do
- return null;
- }
- if (version <= MessagingService.current_version)
- {
- MessagingService.instance().receive(message, id);
- }
- else
- {
- logger.trace("Received connection from newer protocol version {}. Ignoring message", version);
- }
- return message.from;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 8774d38..d520fa9 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -31,20 +31,27 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.net.MessagingService.Verb;
+/**
+ * The receiving node's view of a {@link MessageOut}. See documentation on {@link MessageOut} for details on the
+ * serialization format.
+ *
+ * @param <T> The type of the payload
+ */
public class MessageIn<T>
{
public final InetAddress from;
public final T payload;
public final Map<String, byte[]> parameters;
- public final MessagingService.Verb verb;
+ public final Verb verb;
public final int version;
public final long constructionTime;
private MessageIn(InetAddress from,
T payload,
Map<String, byte[]> parameters,
- MessagingService.Verb verb,
+ Verb verb,
int version,
long constructionTime)
{
@@ -59,7 +66,7 @@ public class MessageIn<T>
public static <T> MessageIn<T> create(InetAddress from,
T payload,
Map<String, byte[]> parameters,
- MessagingService.Verb verb,
+ Verb verb,
int version,
long constructionTime)
{
@@ -85,11 +92,17 @@ public class MessageIn<T>
InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+ Map<String, byte[]> parameters = readParameters(in);
+ int payloadSize = in.readInt();
+ return read(in, version, id, constructionTime, from, payloadSize, verb, parameters);
+ }
+
+ public static Map<String, byte[]> readParameters(DataInputPlus in) throws IOException
+ {
int parameterCount = in.readInt();
- Map<String, byte[]> parameters;
if (parameterCount == 0)
{
- parameters = Collections.emptyMap();
+ return Collections.emptyMap();
}
else
{
@@ -101,10 +114,13 @@ public class MessageIn<T>
in.readFully(value);
builder.put(key, value);
}
- parameters = builder.build();
+ return builder.build();
}
+ }
- int payloadSize = in.readInt();
+ public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime,
+ InetAddress from, int payloadSize, Verb verb, Map<String, byte[]> parameters) throws IOException
+ {
IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb);
if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
{
@@ -124,12 +140,11 @@ public class MessageIn<T>
return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
}
- public static long readConstructionTime(InetAddress from, DataInputPlus input, long currentTime) throws IOException
+ public static long deriveConstructionTime(InetAddress from, int messageTimestamp, long currentTime)
{
// Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
// higher 4 bytes wouldn't change between the sender and receiver)
- int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled
- long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+ long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((messageTimestamp & 0xFFFFFFFFL) << 2) >> 2);
// Because nodes may not have their clock perfectly in sync, it's actually possible the sentConstructionTime is
// later than the currentTime (the received time). If that's the case, as we definitively know there is a lack
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index a38aed5..379aff5 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -29,21 +29,78 @@ import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
import static org.apache.cassandra.tracing.Tracing.isTracing;
+/**
+ * Each message contains a header with several fixed fields, an optional key-value parameters section, and then
+ * the message payload itself. Note: the IP address in the header may be either IPv4 (4 bytes) or IPv6 (16 bytes).
+ * The diagram below shows the IPv4 address for brevity.
+ *
+ * <pre>
+ * {@code
+ * 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 6 6
+ * 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2 4 6 8 0 2
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | PROTOCOL MAGIC |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Message ID |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Timestamp |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Addr len | IP Address (IPv4) /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * / | Verb /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * / | Parameters size /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * / | Parameter data /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * / |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Payload size |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | /
+ * / Payload /
+ * / |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }
+ * </pre>
+ *
+ * An individual parameter has a String key and a byte array value. The key is serialized with it's length,
+ * encoded as two bytes, followed by the UTF-8 byte encoding of the string (see {@link java.io.DataOutput#writeUTF(java.lang.String)}).
+ * The body is serialized with it's length, encoded as four bytes, followed by the bytes of the value.
+ *
+ * * @param <T> The type of the message payload.
+ */
public class MessageOut<T>
{
+ private static final int SERIALIZED_SIZE_VERSION_UNDEFINED = -1;
+
public final InetAddress from;
public final MessagingService.Verb verb;
public final T payload;
public final IVersionedSerializer<T> serializer;
public final Map<String, byte[]> parameters;
- private long payloadSize = -1;
- private int payloadSizeVersion = -1;
+
+ /**
+ * Memoization of the serialized size of the just the payload.
+ */
+ private int payloadSerializedSize = -1;
+
+ /**
+ * Memoization of the serialized size of the entire message.
+ */
+ private int serializedSize = -1;
+
+ /**
+ * The internode protocol messaging version that was used to calculate the memoized serailized sizes.
+ */
+ private int serializedSizeVersion = SERIALIZED_SIZE_VERSION_UNDEFINED;
// we do support messages that just consist of a verb
public MessageOut(MessagingService.Verb verb)
@@ -115,14 +172,12 @@ public class MessageOut<T>
if (payload != null)
{
- try(DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
- {
- serializer.serialize(payload, dob, version);
-
- int size = dob.getLength();
- out.writeInt(size);
- out.write(dob.getData(), 0, size);
- }
+ int payloadSize = payloadSerializedSize >= 0
+ ? (int)payloadSerializedSize
+ : (int) serializer.serializedSize(payload, version);
+
+ out.writeInt(payloadSize);
+ serializer.serialize(payload, out, version);
}
else
{
@@ -130,9 +185,9 @@ public class MessageOut<T>
}
}
- public int serializedSize(int version)
+ private Pair<Long, Long> calculateSerializedSize(int version)
{
- int size = CompactEndpointSerializationHelper.serializedSize(from);
+ long size = CompactEndpointSerializationHelper.serializedSize(from);
size += TypeSizes.sizeof(verb.getId());
size += TypeSizes.sizeof(parameters.size());
@@ -143,38 +198,43 @@ public class MessageOut<T>
size += entry.getValue().length;
}
- long longSize = payloadSize(version);
- assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
- size += TypeSizes.sizeof((int) longSize);
- size += longSize;
- return size;
+ long payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version);
+ assert payloadSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages
+ size += TypeSizes.sizeof((int) payloadSize);
+ size += payloadSize;
+ return Pair.create(size, payloadSize);
}
/**
- * Calculate the size of the payload of this message for the specified protocol version
- * and memoize the result for the specified protocol version. Memoization only covers the protocol
- * version of the first invocation.
+ * Calculate the size of this message for the specified protocol version and memoize the result for the specified
+ * protocol version. Memoization only covers the protocol version of the first invocation.
*
- * It is not safe to call payloadSize concurrently from multiple threads unless it has already been invoked
+ * It is not safe to call this function concurrently from multiple threads unless it has already been invoked
* once from a single thread and there is a happens before relationship between that invocation and other
- * threads concurrently invoking payloadSize.
+ * threads concurrently invoking this function.
*
* For instance it would be safe to invokePayload size to make a decision in the thread that created the message
* and then hand it off to other threads via a thread-safe queue, volatile write, or synchronized/ReentrantLock.
- * @param version Protocol version to use when calculating payload size
- * @return Size of the payload of this message in bytes
+ *
+ * @param version Protocol version to use when calculating size
+ * @return Size of this message in bytes, which will be less than or equal to {@link Integer#MAX_VALUE}
*/
- public long payloadSize(int version)
+ public int serializedSize(int version)
{
- if (payloadSize == -1)
- {
- payloadSize = payload == null ? 0 : serializer.serializedSize(payload, version);
- payloadSizeVersion = version;
- }
- else if (payloadSizeVersion != version)
+ if (serializedSize > 0 && serializedSizeVersion == version)
+ return serializedSize;
+
+ Pair<Long, Long> sizes = calculateSerializedSize(version);
+ if (sizes.left > Integer.MAX_VALUE)
+ throw new IllegalStateException("message size exceeds maximum allowed size: size = " + sizes.left);
+
+ if (serializedSizeVersion == SERIALIZED_SIZE_VERSION_UNDEFINED)
{
- return payload == null ? 0 : serializer.serializedSize(payload, version);
+ serializedSize = sizes.left.intValue();
+ payloadSerializedSize = sizes.right.intValue();
+ serializedSizeVersion = version;
}
- return payloadSize;
+
+ return sizes.left.intValue();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org