You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/02/18 18:22:47 UTC
[1/8] git commit: Allow per-dc enabling of hints patch by Sankalp
Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 972cffd54 -> 500c62d6b
refs/heads/cassandra-2.1 5f82aa3b0 -> f628bd8a4
refs/heads/trunk 434e04281 -> ba23b8206
Allow per-dc enabling of hints
patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/500c62d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/500c62d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/500c62d6
Branch: refs/heads/cassandra-2.0
Commit: 500c62d6b98f5b5c15b91f1d38f0132b846c6b48
Parents: 972cffd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:21:13 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:21:13 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeCmd.java | 5 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
12 files changed, 329 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfec11..f0c116f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
@@ -27,6 +28,7 @@ Merged from 1.2:
* Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
* Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
+
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
* Add ks.cf names to tombstone logging (CASSANDRA-6597)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index fb774f6..77b2639 100644
--- a/build.xml
+++ b/build.xml
@@ -381,6 +381,7 @@
<dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/>
@@ -461,6 +462,7 @@
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
<dependency groupId="log4j" artifactId="log4j"/>
<!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bfe60c4..4c9ad67 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -29,6 +29,9 @@ num_tokens: 256
# that do not have vnodes enabled.
# initial_token:
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/licenses/super-csv-2.1.0.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/super-csv-2.1.0.txt b/lib/licenses/super-csv-2.1.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/super-csv-2.1.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/super-csv-2.1.0.jar
----------------------------------------------------------------------
diff --git a/lib/super-csv-2.1.0.jar b/lib/super-csv-2.1.0.jar
new file mode 100644
index 0000000..6a85716
Binary files /dev/null and b/lib/super-csv-2.1.0.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2fa49f3..ceb8df0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -17,8 +17,18 @@
*/
package org.apache.cassandra.config;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.NativeAllocator;
import org.apache.cassandra.utils.FBUtilities;
@@ -38,7 +48,9 @@ public class Config
public String partitioner;
public Boolean auto_bootstrap = true;
- public volatile Boolean hinted_handoff_enabled = true;
+ public volatile boolean hinted_handoff_enabled_global = true;
+ public String hinted_handoff_enabled;
+ public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
public SeedProviderDef seed_provider;
@@ -185,6 +197,9 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
+ .surroundingSpacesNeedQuotes(true).build();
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
@@ -205,6 +220,38 @@ public class Config
isClientMode = clientMode;
}
+ public void configHintedHandoff() throws ConfigurationException
+ {
+ if (hinted_handoff_enabled != null && !hinted_handoff_enabled.isEmpty())
+ {
+ if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("true"))
+ {
+ hinted_handoff_enabled_global = true;
+ }
+ else if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("false"))
+ {
+ hinted_handoff_enabled_global = false;
+ }
+ else
+ {
+ try
+ {
+ hinted_handoff_enabled_by_dc.addAll(parseHintedHandoffEnabledDCs(hinted_handoff_enabled));
+ }
+ catch (IOException e)
+ {
+ throw new ConfigurationException("Invalid hinted_handoff_enabled parameter " + hinted_handoff_enabled, e);
+ }
+ }
+ }
+ }
+
+ public static List<String> parseHintedHandoffEnabledDCs(final String dcNames) throws IOException
+ {
+ final CsvListReader csvListReader = new CsvListReader(new StringReader(dcNames), STANDARD_SURROUNDING_SPACES_NEED_QUOTES);
+ return csvListReader.read();
+ }
+
public static enum CommitLogSync
{
periodic,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e1a95ab..9e06601 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
import java.io.File;
import java.io.FileFilter;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -1109,12 +1110,47 @@ public class DatabaseDescriptor
public static void setHintedHandoffEnabled(boolean hintedHandoffEnabled)
{
- conf.hinted_handoff_enabled = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_global = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_by_dc.clear();
+ }
+
+ public static void setHintedHandoffEnabled(final String dcNames)
+ {
+ List<String> dcNameList;
+ try
+ {
+ dcNameList = Config.parseHintedHandoffEnabledDCs(dcNames);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Could not read csv of dcs for hinted handoff enable. " + dcNames, e);
+ }
+
+ if (dcNameList.isEmpty())
+ throw new IllegalArgumentException("Empty list of Dcs for hinted handoff enable");
+
+ conf.hinted_handoff_enabled_by_dc.clear();
+ conf.hinted_handoff_enabled_by_dc.addAll(dcNameList);
}
public static boolean hintedHandoffEnabled()
{
- return conf.hinted_handoff_enabled;
+ return conf.hinted_handoff_enabled_global;
+ }
+
+ public static Set<String> hintedHandoffEnabledByDC()
+ {
+ return Collections.unmodifiableSet(conf.hinted_handoff_enabled_by_dc);
+ }
+
+ public static boolean shouldHintByDC()
+ {
+ return !conf.hinted_handoff_enabled_by_dc.isEmpty();
+ }
+
+ public static boolean hintedHandoffEnabled(final String dcName)
+ {
+ return conf.hinted_handoff_enabled_by_dc.contains(dcName);
}
public static void setMaxHintWindow(int ms)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index d8a138c..6b5a152 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -91,6 +91,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
constructor.setPropertyUtils(propertiesChecker);
Yaml yaml = new Yaml(constructor);
Config result = yaml.loadAs(input, Config.class);
+ result.configHintedHandoff();
propertiesChecker.check();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8d1f913..14c1ce3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1758,11 +1758,21 @@ public class StorageProxy implements StorageProxyMBean
return DatabaseDescriptor.hintedHandoffEnabled();
}
+ public Set<String> getHintedHandoffEnabledByDC()
+ {
+ return DatabaseDescriptor.hintedHandoffEnabledByDC();
+ }
+
public void setHintedHandoffEnabled(boolean b)
{
DatabaseDescriptor.setHintedHandoffEnabled(b);
}
+ public void setHintedHandoffEnabledByDCList(String dcNames)
+ {
+ DatabaseDescriptor.setHintedHandoffEnabled(dcNames);
+ }
+
public int getMaxHintWindow()
{
return DatabaseDescriptor.getMaxHintWindow();
@@ -1775,7 +1785,17 @@ public class StorageProxy implements StorageProxyMBean
public static boolean shouldHint(InetAddress ep)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
+ if (DatabaseDescriptor.shouldHintByDC())
+ {
+ final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
+ // Disable DC specific hints
+ if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+ {
+ HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ return false;
+ }
+ }
+ else if (!DatabaseDescriptor.hintedHandoffEnabled())
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ad7d4c7..203cabe 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface StorageProxyMBean
{
@@ -72,7 +73,9 @@ public interface StorageProxyMBean
public long getTotalHints();
public boolean getHintedHandoffEnabled();
+ public Set<String> getHintedHandoffEnabledByDC();
public void setHintedHandoffEnabled(boolean b);
+ public void setHintedHandoffEnabledByDCList(String dcs);
public int getMaxHintWindow();
public void setMaxHintWindow(int ms);
public int getMaxHintsInProgress();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index f3cd563..fb29342 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1229,7 +1229,10 @@ public class NodeCmd
case DISABLEGOSSIP : probe.stopGossiping(); break;
case ENABLEGOSSIP : probe.startGossiping(); break;
case DISABLEHANDOFF : probe.disableHintedHandoff(); break;
- case ENABLEHANDOFF : probe.enableHintedHandoff(); break;
+ case ENABLEHANDOFF :
+ if (arguments.length > 0) { probe.enableHintedHandoff(arguments[0]); }
+ else { probe.enableHintedHandoff(); }
+ break;
case PAUSEHANDOFF : probe.pauseHintsDelivery(); break;
case RESUMEHANDOFF : probe.resumeHintsDelivery(); break;
case DISABLETHRIFT : probe.stopThriftServer(); break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index ffd6203..28cafb7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -721,6 +721,11 @@ public class NodeProbe
spProxy.setHintedHandoffEnabled(true);
}
+ public void enableHintedHandoff(String dcNames)
+ {
+ spProxy.setHintedHandoffEnabledByDCList(dcNames);
+ }
+
public void pauseHintsDelivery()
{
hhProxy.pauseHintsDelivery(true);
[7/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f628bd8a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f628bd8a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f628bd8a
Branch: refs/heads/cassandra-2.1
Commit: f628bd8a4d705c79883da2bb400b08a460fdcc01
Parents: 5f82aa3 500c62d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:21:29 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:22:24 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 9 +-
12 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ea74c62,f0c116f..ee98f5d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
+2.1.0-beta1
+ * Add flush directory distinct from compaction directories (CASSANDRA-6357)
+ * Require JNA by default (CASSANDRA-6575)
+ * add listsnapshots command to nodetool (CASSANDRA-5742)
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271, 6692)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+ * New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns, EmptyColumns, TreeMapBackedSortedColumns with new
+ ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662, CASSANDRA-6690)
+ * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
+ * Avoid repairing already repaired data (CASSANDRA-5351)
+ * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
+ * Replace index_interval with min/max_index_interval (CASSANDRA-6379)
+ * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911)
+
+
2.0.6
+ * Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/build.xml
----------------------------------------------------------------------
diff --cc build.xml
index 1a8de44,77b2639..7afe3bf
--- a/build.xml
+++ b/build.xml
@@@ -384,11 -378,10 +384,12 @@@
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" />
<dependency groupId="com.yammer.metrics" artifactId="metrics-core" version="2.2.0" />
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
- <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
+ <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
+ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/>
@@@ -472,10 -462,11 +473,11 @@@
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
- <dependency groupId="log4j" artifactId="log4j"/>
- <!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
- <dependency groupId="org.slf4j" artifactId="slf4j-log4j12" scope="runtime"/>
+ <dependency groupId="ch.qos.logback" artifactId="logback-core"/>
+ <dependency groupId="ch.qos.logback" artifactId="logback-classic"/>
<dependency groupId="org.apache.thrift" artifactId="libthrift"/>
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 2666316,4c9ad67..41bc038
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -29,7 -29,10 +29,10 @@@ num_tokens: 25
# that do not have vnodes enabled.
# initial_token:
+# See http://wiki.apache.org/cassandra/HintedHandoff
+ # May either be "true" or "false" to enable globally, or contain a list
+ # of data centers to enable per-datacenter.
+ # hinted_handoff_enabled: DC1,DC2
-# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, new hints for it will not be
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index d5108e3,ceb8df0..ee55c92
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -199,9 -197,9 +211,12 @@@ public class Confi
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public volatile Long index_summary_capacity_in_mb;
+ public volatile int index_summary_resize_interval_in_minutes = 60;
+
+ private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
+ .surroundingSpacesNeedQuotes(true).build();
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index aefc431,6b5a152..4a1280c
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@@ -98,7 -90,8 +98,8 @@@ public class YamlConfigurationLoader im
MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();
constructor.setPropertyUtils(propertiesChecker);
Yaml yaml = new Yaml(constructor);
- Config result = yaml.loadAs(input, Config.class);
+ Config result = yaml.loadAs(new ByteArrayInputStream(configBytes), Config.class);
+ result.configHintedHandoff();
propertiesChecker.check();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 05fdd61,14c1ce3..5a51838
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1860,7 -1785,17 +1870,17 @@@ public class StorageProxy implements St
public static boolean shouldHint(InetAddress ep)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
+ if (DatabaseDescriptor.shouldHintByDC())
+ {
+ final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
- // Disable DC specific hints
++ //Disable DC specific hints
+ if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+ {
+ HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ return false;
+ }
+ }
+ else if (!DatabaseDescriptor.hintedHandoffEnabled())
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
[5/8] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f628bd8a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f628bd8a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f628bd8a
Branch: refs/heads/trunk
Commit: f628bd8a4d705c79883da2bb400b08a460fdcc01
Parents: 5f82aa3 500c62d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:21:29 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:22:24 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 9 +-
12 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ea74c62,f0c116f..ee98f5d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,43 @@@
+2.1.0-beta1
+ * Add flush directory distinct from compaction directories (CASSANDRA-6357)
+ * Require JNA by default (CASSANDRA-6575)
+ * add listsnapshots command to nodetool (CASSANDRA-5742)
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271, 6692)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+ * New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns, EmptyColumns, TreeMapBackedSortedColumns with new
+ ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662, CASSANDRA-6690)
+ * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
+ * Avoid repairing already repaired data (CASSANDRA-5351)
+ * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
+ * Replace index_interval with min/max_index_interval (CASSANDRA-6379)
+ * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911)
+
+
2.0.6
+ * Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/build.xml
----------------------------------------------------------------------
diff --cc build.xml
index 1a8de44,77b2639..7afe3bf
--- a/build.xml
+++ b/build.xml
@@@ -384,11 -378,10 +384,12 @@@
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift" version="${version}" />
<dependency groupId="com.yammer.metrics" artifactId="metrics-core" version="2.2.0" />
<dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
- <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
+ <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
+ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/>
@@@ -472,10 -462,11 +473,11 @@@
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
- <dependency groupId="log4j" artifactId="log4j"/>
- <!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
- <dependency groupId="org.slf4j" artifactId="slf4j-log4j12" scope="runtime"/>
+ <dependency groupId="ch.qos.logback" artifactId="logback-core"/>
+ <dependency groupId="ch.qos.logback" artifactId="logback-classic"/>
<dependency groupId="org.apache.thrift" artifactId="libthrift"/>
<dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 2666316,4c9ad67..41bc038
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -29,7 -29,10 +29,10 @@@ num_tokens: 25
# that do not have vnodes enabled.
# initial_token:
+# See http://wiki.apache.org/cassandra/HintedHandoff
+ # May either be "true" or "false" to enable globally, or contain a list
+ # of data centers to enable per-datacenter.
+ # hinted_handoff_enabled: DC1,DC2
-# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
# generated. After it has been dead this long, new hints for it will not be
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index d5108e3,ceb8df0..ee55c92
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -199,9 -197,9 +211,12 @@@ public class Confi
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public volatile Long index_summary_capacity_in_mb;
+ public volatile int index_summary_resize_interval_in_minutes = 60;
+
+ private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
+ .surroundingSpacesNeedQuotes(true).build();
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index aefc431,6b5a152..4a1280c
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@@ -98,7 -90,8 +98,8 @@@ public class YamlConfigurationLoader im
MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();
constructor.setPropertyUtils(propertiesChecker);
Yaml yaml = new Yaml(constructor);
- Config result = yaml.loadAs(input, Config.class);
+ Config result = yaml.loadAs(new ByteArrayInputStream(configBytes), Config.class);
+ result.configHintedHandoff();
propertiesChecker.check();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 05fdd61,14c1ce3..5a51838
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1860,7 -1785,17 +1870,17 @@@ public class StorageProxy implements St
public static boolean shouldHint(InetAddress ep)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
+ if (DatabaseDescriptor.shouldHintByDC())
+ {
+ final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
- // Disable DC specific hints
++ //Disable DC specific hints
+ if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+ {
+ HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ return false;
+ }
+ }
+ else if (!DatabaseDescriptor.hintedHandoffEnabled())
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
[8/8] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ba23b820
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ba23b820
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ba23b820
Branch: refs/heads/trunk
Commit: ba23b82064cb5b8c32687031fd4f0d5a76a4026e
Parents: 434e042 f628bd8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:22:33 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:22:33 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 9 +-
12 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[3/8] git commit: Allow per-dc enabling of hints patch by Sankalp
Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Posted by jb...@apache.org.
Allow per-dc enabling of hints
patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/500c62d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/500c62d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/500c62d6
Branch: refs/heads/trunk
Commit: 500c62d6b98f5b5c15b91f1d38f0132b846c6b48
Parents: 972cffd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:21:13 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:21:13 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeCmd.java | 5 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
12 files changed, 329 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfec11..f0c116f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
@@ -27,6 +28,7 @@ Merged from 1.2:
* Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
* Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
+
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
* Add ks.cf names to tombstone logging (CASSANDRA-6597)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index fb774f6..77b2639 100644
--- a/build.xml
+++ b/build.xml
@@ -381,6 +381,7 @@
<dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/>
@@ -461,6 +462,7 @@
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
<dependency groupId="log4j" artifactId="log4j"/>
<!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bfe60c4..4c9ad67 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -29,6 +29,9 @@ num_tokens: 256
# that do not have vnodes enabled.
# initial_token:
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/licenses/super-csv-2.1.0.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/super-csv-2.1.0.txt b/lib/licenses/super-csv-2.1.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/super-csv-2.1.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/super-csv-2.1.0.jar
----------------------------------------------------------------------
diff --git a/lib/super-csv-2.1.0.jar b/lib/super-csv-2.1.0.jar
new file mode 100644
index 0000000..6a85716
Binary files /dev/null and b/lib/super-csv-2.1.0.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2fa49f3..ceb8df0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -17,8 +17,18 @@
*/
package org.apache.cassandra.config;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.NativeAllocator;
import org.apache.cassandra.utils.FBUtilities;
@@ -38,7 +48,9 @@ public class Config
public String partitioner;
public Boolean auto_bootstrap = true;
- public volatile Boolean hinted_handoff_enabled = true;
+ public volatile boolean hinted_handoff_enabled_global = true;
+ public String hinted_handoff_enabled;
+ public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
public SeedProviderDef seed_provider;
@@ -185,6 +197,9 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
+ .surroundingSpacesNeedQuotes(true).build();
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
@@ -205,6 +220,38 @@ public class Config
isClientMode = clientMode;
}
+ public void configHintedHandoff() throws ConfigurationException
+ {
+ if (hinted_handoff_enabled != null && !hinted_handoff_enabled.isEmpty())
+ {
+ if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("true"))
+ {
+ hinted_handoff_enabled_global = true;
+ }
+ else if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("false"))
+ {
+ hinted_handoff_enabled_global = false;
+ }
+ else
+ {
+ try
+ {
+ hinted_handoff_enabled_by_dc.addAll(parseHintedHandoffEnabledDCs(hinted_handoff_enabled));
+ }
+ catch (IOException e)
+ {
+ throw new ConfigurationException("Invalid hinted_handoff_enabled parameter " + hinted_handoff_enabled, e);
+ }
+ }
+ }
+ }
+
+ public static List<String> parseHintedHandoffEnabledDCs(final String dcNames) throws IOException
+ {
+ final CsvListReader csvListReader = new CsvListReader(new StringReader(dcNames), STANDARD_SURROUNDING_SPACES_NEED_QUOTES);
+ return csvListReader.read();
+ }
+
public static enum CommitLogSync
{
periodic,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e1a95ab..9e06601 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
import java.io.File;
import java.io.FileFilter;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -1109,12 +1110,47 @@ public class DatabaseDescriptor
public static void setHintedHandoffEnabled(boolean hintedHandoffEnabled)
{
- conf.hinted_handoff_enabled = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_global = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_by_dc.clear();
+ }
+
+ public static void setHintedHandoffEnabled(final String dcNames)
+ {
+ List<String> dcNameList;
+ try
+ {
+ dcNameList = Config.parseHintedHandoffEnabledDCs(dcNames);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Could not read csv of dcs for hinted handoff enable. " + dcNames, e);
+ }
+
+ if (dcNameList.isEmpty())
+ throw new IllegalArgumentException("Empty list of Dcs for hinted handoff enable");
+
+ conf.hinted_handoff_enabled_by_dc.clear();
+ conf.hinted_handoff_enabled_by_dc.addAll(dcNameList);
}
public static boolean hintedHandoffEnabled()
{
- return conf.hinted_handoff_enabled;
+ return conf.hinted_handoff_enabled_global;
+ }
+
+ public static Set<String> hintedHandoffEnabledByDC()
+ {
+ return Collections.unmodifiableSet(conf.hinted_handoff_enabled_by_dc);
+ }
+
+ public static boolean shouldHintByDC()
+ {
+ return !conf.hinted_handoff_enabled_by_dc.isEmpty();
+ }
+
+ public static boolean hintedHandoffEnabled(final String dcName)
+ {
+ return conf.hinted_handoff_enabled_by_dc.contains(dcName);
}
public static void setMaxHintWindow(int ms)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index d8a138c..6b5a152 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -91,6 +91,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
constructor.setPropertyUtils(propertiesChecker);
Yaml yaml = new Yaml(constructor);
Config result = yaml.loadAs(input, Config.class);
+ result.configHintedHandoff();
propertiesChecker.check();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8d1f913..14c1ce3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1758,11 +1758,21 @@ public class StorageProxy implements StorageProxyMBean
return DatabaseDescriptor.hintedHandoffEnabled();
}
+ public Set<String> getHintedHandoffEnabledByDC()
+ {
+ return DatabaseDescriptor.hintedHandoffEnabledByDC();
+ }
+
public void setHintedHandoffEnabled(boolean b)
{
DatabaseDescriptor.setHintedHandoffEnabled(b);
}
+ public void setHintedHandoffEnabledByDCList(String dcNames)
+ {
+ DatabaseDescriptor.setHintedHandoffEnabled(dcNames);
+ }
+
public int getMaxHintWindow()
{
return DatabaseDescriptor.getMaxHintWindow();
@@ -1775,7 +1785,17 @@ public class StorageProxy implements StorageProxyMBean
public static boolean shouldHint(InetAddress ep)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
+ if (DatabaseDescriptor.shouldHintByDC())
+ {
+ final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
+ // Disable DC specific hints
+ if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+ {
+ HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ return false;
+ }
+ }
+ else if (!DatabaseDescriptor.hintedHandoffEnabled())
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ad7d4c7..203cabe 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface StorageProxyMBean
{
@@ -72,7 +73,9 @@ public interface StorageProxyMBean
public long getTotalHints();
public boolean getHintedHandoffEnabled();
+ public Set<String> getHintedHandoffEnabledByDC();
public void setHintedHandoffEnabled(boolean b);
+ public void setHintedHandoffEnabledByDCList(String dcs);
public int getMaxHintWindow();
public void setMaxHintWindow(int ms);
public int getMaxHintsInProgress();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index f3cd563..fb29342 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1229,7 +1229,10 @@ public class NodeCmd
case DISABLEGOSSIP : probe.stopGossiping(); break;
case ENABLEGOSSIP : probe.startGossiping(); break;
case DISABLEHANDOFF : probe.disableHintedHandoff(); break;
- case ENABLEHANDOFF : probe.enableHintedHandoff(); break;
+ case ENABLEHANDOFF :
+ if (arguments.length > 0) { probe.enableHintedHandoff(arguments[0]); }
+ else { probe.enableHintedHandoff(); }
+ break;
case PAUSEHANDOFF : probe.pauseHintsDelivery(); break;
case RESUMEHANDOFF : probe.resumeHintsDelivery(); break;
case DISABLETHRIFT : probe.stopThriftServer(); break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index ffd6203..28cafb7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -721,6 +721,11 @@ public class NodeProbe
spProxy.setHintedHandoffEnabled(true);
}
+ public void enableHintedHandoff(String dcNames)
+ {
+ spProxy.setHintedHandoffEnabledByDCList(dcNames);
+ }
+
public void pauseHintsDelivery()
{
hhProxy.pauseHintsDelivery(true);
[6/8] Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeTool.java
index 94bce74,0000000..453491b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@@ -1,2248 -1,0 +1,2255 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.MemoryUsage;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+
+import com.yammer.metrics.reporting.JmxReporter;
+import io.airlift.command.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManagerMBean;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static org.apache.commons.lang3.ArrayUtils.EMPTY_STRING_ARRAY;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.join;
+
+public class NodeTool
+{
+ private static final String HISTORYFILE = "nodetool.history";
+
+ public static void main(String... args)
+ {
+ List<Class<? extends Runnable>> commands = newArrayList(
+ Help.class,
+ Info.class,
+ Ring.class,
+ NetStats.class,
+ CfStats.class,
+ CfHistograms.class,
+ Cleanup.class,
+ ClearSnapshot.class,
+ Compact.class,
+ Scrub.class,
+ Flush.class,
+ UpgradeSSTable.class,
+ DisableAutoCompaction.class,
+ EnableAutoCompaction.class,
+ CompactionStats.class,
+ CompactionHistory.class,
+ Decommission.class,
+ DescribeCluster.class,
+ DisableBinary.class,
+ EnableBinary.class,
+ EnableGossip.class,
+ DisableGossip.class,
+ EnableHandoff.class,
+ EnableThrift.class,
+ GetCompactionThreshold.class,
+ GetCompactionThroughput.class,
+ GetStreamThroughput.class,
+ GetEndpoints.class,
+ GetSSTables.class,
+ GossipInfo.class,
+ InvalidateKeyCache.class,
+ InvalidateRowCache.class,
+ InvalidateCounterCache.class,
+ Join.class,
+ Move.class,
+ PauseHandoff.class,
+ ResumeHandoff.class,
+ ProxyHistograms.class,
+ Rebuild.class,
+ Refresh.class,
+ RemoveToken.class,
+ RemoveNode.class,
+ Repair.class,
+ SetCacheCapacity.class,
+ SetCompactionThreshold.class,
+ SetCompactionThroughput.class,
+ SetStreamThroughput.class,
+ SetTraceProbability.class,
+ Snapshot.class,
+ ListSnapshots.class,
+ Status.class,
+ StatusBinary.class,
+ StatusThrift.class,
+ Stop.class,
+ StopDaemon.class,
+ Version.class,
+ DescribeRing.class,
+ RebuildIndex.class,
+ RangeKeySample.class,
+ EnableBackup.class,
+ DisableBackup.class,
+ ResetLocalSchema.class,
+ ReloadTriggers.class,
+ SetCacheKeysToSave.class,
+ DisableThrift.class,
+ DisableHandoff.class,
+ Drain.class,
+ TruncateHints.class,
+ TpStats.class,
+ TakeToken.class
+ );
+
+ Cli<Runnable> parser = Cli.<Runnable>builder("nodetool")
+ .withDescription("Manage your Cassandra cluster")
+ .withDefaultCommand(Help.class)
+ .withCommands(commands)
+ .build();
+
+ int status = 0;
+ try
+ {
+ Runnable parse = parser.parse(args);
+ printHistory(args);
+ parse.run();
+ } catch (IllegalArgumentException |
+ IllegalStateException |
+ ParseArgumentsMissingException |
+ ParseArgumentsUnexpectedException |
+ ParseOptionConversionException |
+ ParseOptionMissingException |
+ ParseOptionMissingValueException |
+ ParseCommandMissingException |
+ ParseCommandUnrecognizedException e)
+ {
+ badUse(e);
+ status = 1;
+ } catch (Throwable throwable)
+ {
+ err(Throwables.getRootCause(throwable));
+ status = 2;
+ }
+
+ System.exit(status);
+ }
+
+ private static void printHistory(String... args)
+ {
+ //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
+ if (args.length == 0)
+ return;
+
+ String cmdLine = Joiner.on(" ").skipNulls().join(args);
+ cmdLine = cmdLine.replaceFirst("(?<=(-pw|--password))\\s+\\S+", " <hidden>");
+
+ try (FileWriter writer = new FileWriter(new File(FBUtilities.getToolsOutputDirectory(), HISTORYFILE), true))
+ {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ writer.append(sdf.format(new Date())).append(": ").append(cmdLine).append(System.lineSeparator());
+ }
+ catch (IOException ioe)
+ {
+ //quietly ignore any errors about not being able to write out history
+ }
+ }
+
+ private static void badUse(Exception e)
+ {
+ System.out.println("nodetool: " + e.getMessage());
+ System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
+ }
+
+ private static void err(Throwable e)
+ {
+ System.err.println("error: " + e.getMessage());
+ System.err.println("-- StackTrace --");
+ System.err.println(getStackTraceAsString(e));
+ }
+
+ public static abstract class NodeToolCmd implements Runnable
+ {
+
+ @Option(type = OptionType.GLOBAL, name = {"-h", "--host"}, description = "Node hostname or ip address")
+ private String host = "127.0.0.1";
+
+ @Option(type = OptionType.GLOBAL, name = {"-p", "--port"}, description = "Remote jmx agent port number")
+ private String port = "7199";
+
+ @Option(type = OptionType.GLOBAL, name = {"-u", "--username"}, description = "Remote jmx agent username")
+ private String username = EMPTY;
+
+ @Option(type = OptionType.GLOBAL, name = {"-pw", "--password"}, description = "Remote jmx agent password")
+ private String password = EMPTY;
+
+ @Override
+ public void run()
+ {
+ try (NodeProbe probe = connect())
+ {
+ execute(probe);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error while closing JMX connection", e);
+ }
+
+ }
+
+ protected abstract void execute(NodeProbe probe);
+
+ private NodeProbe connect()
+ {
+ NodeProbe nodeClient = null;
+
+ try
+ {
+ if (username.isEmpty())
+ nodeClient = new NodeProbe(host, parseInt(port));
+ else
+ nodeClient = new NodeProbe(host, parseInt(port), username, password);
+ } catch (IOException e)
+ {
+ Throwable rootCause = Throwables.getRootCause(e);
+ System.err.println(format("nodetool: Failed to connect to '%s:%s' - %s: '%s'.", host, port, rootCause.getClass().getSimpleName(), rootCause.getMessage()));
+ System.exit(1);
+ }
+
+ return nodeClient;
+ }
+
+ protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe)
+ {
+ List<String> keyspaces = new ArrayList<>();
+
+ if (cmdArgs == null || cmdArgs.isEmpty())
+ keyspaces.addAll(nodeProbe.getKeyspaces());
+ else
+ keyspaces.add(cmdArgs.get(0));
+
+ for (String keyspace : keyspaces)
+ {
+ if (!nodeProbe.getKeyspaces().contains(keyspace))
+ throw new IllegalArgumentException("Keyspace [" + keyspace + "] does not exist.");
+ }
+
+ return Collections.unmodifiableList(keyspaces);
+ }
+
+ protected String[] parseOptionalColumnFamilies(List<String> cmdArgs)
+ {
+ return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
+ }
+ }
+
+ @Command(name = "info", description = "Print node information (uptime, load, ...)")
+ public static class Info extends NodeToolCmd
+ {
+ @Option(name = {"-T", "--tokens"}, description = "Display all tokens")
+ private boolean tokens = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ boolean gossipInitialized = probe.isInitialized();
+
+ System.out.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
+ System.out.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
+ System.out.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
+ System.out.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
+ System.out.printf("%-17s: %s%n", "Load", probe.getLoadString());
+ if (gossipInitialized)
+ System.out.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
+ else
+ System.out.printf("%-17s: %s%n", "Generation No", 0);
+
+ // Uptime
+ long secondsUp = probe.getUptime() / 1000;
+ System.out.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
+
+ // Memory usage
+ MemoryUsage heapUsage = probe.getHeapMemoryUsage();
+ double memUsed = (double) heapUsage.getUsed() / (1024 * 1024);
+ double memMax = (double) heapUsage.getMax() / (1024 * 1024);
+ System.out.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
+
+ // Data Center/Rack
+ System.out.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
+ System.out.printf("%-17s: %s%n", "Rack", probe.getRack());
+
+ // Exceptions
+ System.out.printf("%-17s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
+
+ CacheServiceMBean cacheService = probe.getCacheServiceMBean();
+
+ // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Key Cache",
+ probe.getCacheMetric("KeyCache", "Entries"),
+ probe.getCacheMetric("KeyCache", "Size"),
+ probe.getCacheMetric("KeyCache", "Capacity"),
+ probe.getCacheMetric("KeyCache", "Hits"),
+ probe.getCacheMetric("KeyCache", "Requests"),
+ probe.getCacheMetric("KeyCache", "HitRate"),
+ cacheService.getKeyCacheSavePeriodInSeconds());
+
+ // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Row Cache",
+ probe.getCacheMetric("RowCache", "Entries"),
+ probe.getCacheMetric("RowCache", "Size"),
+ probe.getCacheMetric("RowCache", "Capacity"),
+ probe.getCacheMetric("RowCache", "Hits"),
+ probe.getCacheMetric("RowCache", "Requests"),
+ probe.getCacheMetric("RowCache", "HitRate"),
+ cacheService.getRowCacheSavePeriodInSeconds());
+
+ // Counter Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Counter Cache",
+ probe.getCacheMetric("CounterCache", "Entries"),
+ probe.getCacheMetric("CounterCache", "Size"),
+ probe.getCacheMetric("CounterCache", "Capacity"),
+ probe.getCacheMetric("CounterCache", "Hits"),
+ probe.getCacheMetric("CounterCache", "Requests"),
+ probe.getCacheMetric("CounterCache", "HitRate"),
+ cacheService.getCounterCacheSavePeriodInSeconds());
+
+ // Tokens
+ List<String> tokens = probe.getTokens();
+ if (tokens.size() == 1 || this.tokens)
+ for (String token : tokens)
+ System.out.printf("%-17s: %s%n", "Token", token);
+ else
+ System.out.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", tokens.size());
+ }
+ }
+
+ @Command(name = "ring", description = "Print information about the token ring")
+ public static class Ring extends NodeToolCmd
+ {
+ @Arguments(description = "Specify a keyspace for accurate ownership information (topology awareness)")
+ private String keyspace = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
+ LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
+ for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
+ endpointsToTokens.put(entry.getValue(), entry.getKey());
+
+ int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
+ {
+ @Override
+ public int compare(String first, String second)
+ {
+ return ((Integer) first.length()).compareTo(second.length());
+ }
+ }).length();
+
+ String formatPlaceholder = "%%-%ds %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
+ String format = format(formatPlaceholder, maxAddressLength);
+
+ // Calculate per-token ownership of the ring
+ Map<InetAddress, Float> ownerships;
+ try
+ {
+ ownerships = probe.effectiveOwnership(keyspace);
+ } catch (IllegalStateException ex)
+ {
+ ownerships = probe.getOwnership();
+ System.out.printf("Note: Ownership information does not include topology; for complete information, specify a keyspace%n");
+ }
+ try
+ {
+ System.out.println();
+ Map<String, Map<InetAddress, Float>> perDcOwnerships = Maps.newLinkedHashMap();
+ // get the different datasets and map to tokens
+ for (Map.Entry<InetAddress, Float> ownership : ownerships.entrySet())
+ {
+ String dc = probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress());
+ if (!perDcOwnerships.containsKey(dc))
+ perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, Float>());
+ perDcOwnerships.get(dc).put(ownership.getKey(), ownership.getValue());
+ }
+ for (Map.Entry<String, Map<InetAddress, Float>> entry : perDcOwnerships.entrySet())
+ printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue());
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (DatabaseDescriptor.getNumTokens() > 1)
+ {
+ System.out.println(" Warning: \"nodetool ring\" is used to output all the tokens of a node.");
+ System.out.println(" To view status related info of a node use \"nodetool status\" instead.\n");
+ }
+ }
+
+ private void printDc(NodeProbe probe, String format,
+ String dc,
+ LinkedHashMultimap<String, String> endpointsToTokens,
+ Map<InetAddress, Float> filteredOwnerships)
+ {
+ Collection<String> liveNodes = probe.getLiveNodes();
+ Collection<String> deadNodes = probe.getUnreachableNodes();
+ Collection<String> joiningNodes = probe.getJoiningNodes();
+ Collection<String> leavingNodes = probe.getLeavingNodes();
+ Collection<String> movingNodes = probe.getMovingNodes();
+ Map<String, String> loadMap = probe.getLoadMap();
+
+ System.out.println("Datacenter: " + dc);
+ System.out.println("==========");
+
+ // get the total amount of replicas for this dc and the last token in this dc's ring
+ List<String> tokens = new ArrayList<>();
+ String lastToken = "";
+
+ for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+ {
+ tokens.addAll(endpointsToTokens.get(entry.getKey().getHostAddress()));
+ lastToken = tokens.get(tokens.size() - 1);
+ }
+
+
+ System.out.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token");
+
+ if (filteredOwnerships.size() > 1)
+ System.out.printf(format, "", "", "", "", "", "", lastToken);
+ else
+ System.out.println();
+
+ for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+ {
+ String endpoint = entry.getKey().getHostAddress();
+ for (String token : endpointsToTokens.get(endpoint))
+ {
+ String rack;
+ try
+ {
+ rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
+ } catch (UnknownHostException e)
+ {
+ rack = "Unknown";
+ }
+
+ String status = liveNodes.contains(endpoint)
+ ? "Up"
+ : deadNodes.contains(endpoint)
+ ? "Down"
+ : "?";
+
+ String state = "Normal";
+
+ if (joiningNodes.contains(endpoint))
+ state = "Joining";
+ else if (leavingNodes.contains(endpoint))
+ state = "Leaving";
+ else if (movingNodes.contains(endpoint))
+ state = "Moving";
+
+ String load = loadMap.containsKey(endpoint)
+ ? loadMap.get(endpoint)
+ : "?";
+ String owns = new DecimalFormat("##0.00%").format(entry.getValue());
+ System.out.printf(format, endpoint, rack, status, state, load, owns, token);
+ }
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "netstats", description = "Print network information on provided host (connecting node by default)")
+ public static class NetStats extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.printf("Mode: %s%n", probe.getOperationMode());
+ Set<StreamState> statuses = probe.getStreamStatus();
+ if (statuses.isEmpty())
+ System.out.println("Not sending any streams.");
+ for (StreamState status : statuses)
+ {
+ System.out.printf("%s %s%n", status.description, status.planId.toString());
+ for (SessionInfo info : status.sessions)
+ {
+ System.out.printf(" %s%n", info.peer.toString());
+ if (!info.receivingSummaries.isEmpty())
+ {
+ System.out.printf(" Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
+ for (ProgressInfo progress : info.getReceivingFiles())
+ {
+ System.out.printf(" %s%n", progress.toString());
+ }
+ }
+ if (!info.sendingSummaries.isEmpty())
+ {
+ System.out.printf(" Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
+ for (ProgressInfo progress : info.getSendingFiles())
+ {
+ System.out.printf(" %s%n", progress.toString());
+ }
+ }
+ }
+ }
+
+ System.out.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
+
+ MessagingServiceMBean ms = probe.msProxy;
+ System.out.printf("%-25s", "Pool Name");
+ System.out.printf("%10s", "Active");
+ System.out.printf("%10s", "Pending");
+ System.out.printf("%15s%n", "Completed");
+
+ int pending;
+ long completed;
+
+ pending = 0;
+ for (int n : ms.getCommandPendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getCommandCompletedTasks().values())
+ completed += n;
+ System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed);
+
+ pending = 0;
+ for (int n : ms.getResponsePendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getResponseCompletedTasks().values())
+ completed += n;
+ System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed);
+ }
+ }
+
+ @Command(name = "cfstats", description = "Print statistics on column families")
+ public static class CfStats extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace.cfname>...]", description = "List of column families (or keyspace) names")
+ private List<String> cfnames = new ArrayList<>();
+
+ @Option(name = "-i", description = "Ignore the list of column families and display the remaining cfs")
+ private boolean ignore = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ OptionFilter filter = new OptionFilter(ignore, cfnames);
+ Map<String, List<ColumnFamilyStoreMBean>> cfstoreMap = new HashMap<>();
+
+ // get a list of column family stores
+ Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
+
+ while (cfamilies.hasNext())
+ {
+ Map.Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
+ String keyspaceName = entry.getKey();
+ ColumnFamilyStoreMBean cfsProxy = entry.getValue();
+
+ if (!cfstoreMap.containsKey(keyspaceName) && filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+ {
+ List<ColumnFamilyStoreMBean> columnFamilies = new ArrayList<>();
+ columnFamilies.add(cfsProxy);
+ cfstoreMap.put(keyspaceName, columnFamilies);
+ } else if (filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+ {
+ cfstoreMap.get(keyspaceName).add(cfsProxy);
+ }
+ }
+
+ // make sure all specified kss and cfs exist
+ filter.verifyKeyspaces(probe.getKeyspaces());
+ filter.verifyColumnFamilies();
+
+ // print out the table statistics
+ for (Map.Entry<String, List<ColumnFamilyStoreMBean>> entry : cfstoreMap.entrySet())
+ {
+ String keyspaceName = entry.getKey();
+ List<ColumnFamilyStoreMBean> columnFamilies = entry.getValue();
+ long keyspaceReadCount = 0;
+ long keyspaceWriteCount = 0;
+ int keyspacePendingTasks = 0;
+ double keyspaceTotalReadTime = 0.0f;
+ double keyspaceTotalWriteTime = 0.0f;
+
+ System.out.println("Keyspace: " + keyspaceName);
+ for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+ {
+ String cfName = cfstore.getColumnFamilyName();
+ long writeCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount();
+ long readCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount();
+
+ if (readCount > 0)
+ {
+ keyspaceReadCount += readCount;
+ keyspaceTotalReadTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadTotalLatency");
+ }
+ if (writeCount > 0)
+ {
+ keyspaceWriteCount += writeCount;
+ keyspaceTotalWriteTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteTotalLatency");
+ }
+ keyspacePendingTasks += (int) probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks");
+ }
+
+ double keyspaceReadLatency = keyspaceReadCount > 0
+ ? keyspaceTotalReadTime / keyspaceReadCount / 1000
+ : Double.NaN;
+ double keyspaceWriteLatency = keyspaceWriteCount > 0
+ ? keyspaceTotalWriteTime / keyspaceWriteCount / 1000
+ : Double.NaN;
+
+ System.out.println("\tRead Count: " + keyspaceReadCount);
+ System.out.println("\tRead Latency: " + format("%s", keyspaceReadLatency) + " ms.");
+ System.out.println("\tWrite Count: " + keyspaceWriteCount);
+ System.out.println("\tWrite Latency: " + format("%s", keyspaceWriteLatency) + " ms.");
+ System.out.println("\tPending Tasks: " + keyspacePendingTasks);
+
+ // print out column family statistics for this keyspace
+ for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+ {
+ String cfName = cfstore.getColumnFamilyName();
+ if (cfName.contains("."))
+ System.out.println("\t\tTable (index): " + cfName);
+ else
+ System.out.println("\t\tTable: " + cfName);
+
+ System.out.println("\t\tSSTable count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveSSTableCount"));
+
+ int[] leveledSStables = cfstore.getSSTableCountPerLevel();
+ if (leveledSStables != null)
+ {
+ System.out.print("\t\tSSTables in each level: [");
+ for (int level = 0; level < leveledSStables.length; level++)
+ {
+ int count = leveledSStables[level];
+ System.out.print(count);
+ long maxCount = 4L; // for L0
+ if (level > 0)
+ maxCount = (long) Math.pow(10, level);
+ // show max threshold for level when exceeded
+ if (count > maxCount)
+ System.out.print("/" + maxCount);
+
+ if (level < leveledSStables.length - 1)
+ System.out.print(", ");
+ else
+ System.out.println("]");
+ }
+ }
+ System.out.println("\t\tSpace used (live), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveDiskSpaceUsed"));
+ System.out.println("\t\tSpace used (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "TotalDiskSpaceUsed"));
+ System.out.println("\t\tSpace used by snapshots (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "SnapshotsSize"));
+ System.out.println("\t\tSSTable Compression Ratio: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionRatio"));
+ System.out.println("\t\tMemtable cell count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableColumnsCount"));
+ System.out.println("\t\tMemtable data size, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableDataSize"));
+ System.out.println("\t\tMemtable switch count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableSwitchCount"));
+ System.out.println("\t\tLocal read count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount());
+ double localReadLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getMean() / 1000;
+ double localRLatency = localReadLatency > 0 ? localReadLatency : Double.NaN;
+ System.out.printf("\t\tLocal read latency: %01.3f ms%n", localRLatency);
+ System.out.println("\t\tLocal write count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount());
+ double localWriteLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getMean() / 1000;
+ double localWLatency = localWriteLatency > 0 ? localWriteLatency : Double.NaN;
+ System.out.printf("\t\tLocal write latency: %01.3f ms%n", localWLatency);
+ System.out.println("\t\tPending tasks: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks"));
+ System.out.println("\t\tBloom filter false positives: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterFalsePositives"));
+ System.out.println("\t\tBloom filter false ratio: " + format("%01.5f", probe.getColumnFamilyMetric(keyspaceName, cfName, "RecentBloomFilterFalseRatio")));
+ System.out.println("\t\tBloom filter space used, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterDiskSpaceUsed"));
+ System.out.println("\t\tCompacted partition minimum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MinRowSize"));
+ System.out.println("\t\tCompacted partition maximum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MaxRowSize"));
+ System.out.println("\t\tCompacted partition mean bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MeanRowSize"));
+ System.out.println("\t\tAverage live cells per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveScannedHistogram")).getMean());
+ System.out.println("\t\tAverage tombstones per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "TombstoneScannedHistogram")).getMean());
+
+ System.out.println("");
+ }
+ System.out.println("----------------");
+ }
+ }
+
+ /**
+ * Used for filtering keyspaces and columnfamilies to be displayed using the cfstats command.
+ */
+ private static class OptionFilter
+ {
+ private Map<String, List<String>> filter = new HashMap<>();
+ private Map<String, List<String>> verifier = new HashMap<>();
+ private List<String> filterList = new ArrayList<>();
+ private boolean ignoreMode;
+
+ public OptionFilter(boolean ignoreMode, List<String> filterList)
+ {
+ this.filterList.addAll(filterList);
+ this.ignoreMode = ignoreMode;
+
+ for (String s : filterList)
+ {
+ String[] keyValues = s.split("\\.", 2);
+
+ // build the map that stores the ks' and cfs to use
+ if (!filter.containsKey(keyValues[0]))
+ {
+ filter.put(keyValues[0], new ArrayList<String>());
+ verifier.put(keyValues[0], new ArrayList<String>());
+
+ if (keyValues.length == 2)
+ {
+ filter.get(keyValues[0]).add(keyValues[1]);
+ verifier.get(keyValues[0]).add(keyValues[1]);
+ }
+ } else
+ {
+ if (keyValues.length == 2)
+ {
+ filter.get(keyValues[0]).add(keyValues[1]);
+ verifier.get(keyValues[0]).add(keyValues[1]);
+ }
+ }
+ }
+ }
+
+ public boolean isColumnFamilyIncluded(String keyspace, String columnFamily)
+ {
+ // supplying empty params list is treated as wanting to display all kss & cfs
+ if (filterList.isEmpty())
+ return !ignoreMode;
+
+ List<String> cfs = filter.get(keyspace);
+
+ // no such keyspace is in the map
+ if (cfs == null)
+ return ignoreMode;
+ // only a keyspace with no cfs was supplied
+ // so ignore or include (based on the flag) every column family in specified keyspace
+ else if (cfs.size() == 0)
+ return !ignoreMode;
+
+ // keyspace exists, and it contains specific cfs
+ verifier.get(keyspace).remove(columnFamily);
+ return ignoreMode ^ cfs.contains(columnFamily);
+ }
+
+ public void verifyKeyspaces(List<String> keyspaces)
+ {
+ for (String ks : verifier.keySet())
+ if (!keyspaces.contains(ks))
+ throw new IllegalArgumentException("Unknown keyspace: " + ks);
+ }
+
+ public void verifyColumnFamilies()
+ {
+ for (String ks : filter.keySet())
+ if (verifier.get(ks).size() > 0)
+ throw new IllegalArgumentException("Unknown column families: " + verifier.get(ks).toString() + " in keyspace: " + ks);
+ }
+ }
+ }
+
+ @Command(name = "cfhistograms", description = "Print statistic histograms for a given column family")
+ public static class CfHistograms extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "cfhistograms requires ks and cf args");
+
+ String keyspace = args.get(0);
+ String cfname = args.get(1);
+
+ // calculate percentile of row size and column count
+ long[] estimatedRowSize = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedRowSizeHistogram");
+ long[] estimatedColumnCount = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedColumnCountHistogram");
+
+ long[] bucketOffsets = new EstimatedHistogram().getBucketOffsets();
+ EstimatedHistogram rowSizeHist = new EstimatedHistogram(bucketOffsets, estimatedRowSize);
+ EstimatedHistogram columnCountHist = new EstimatedHistogram(bucketOffsets, estimatedColumnCount);
+
+ // build arrays to store percentile values
+ double[] estimatedRowSizePercentiles = new double[7];
+ double[] estimatedColumnCountPercentiles = new double[7];
+ double[] offsetPercentiles = new double[]{0.5, 0.75, 0.95, 0.98, 0.99};
+ for (int i = 0; i < offsetPercentiles.length; i++)
+ {
+ estimatedRowSizePercentiles[i] = rowSizeHist.percentile(offsetPercentiles[i]);
+ estimatedColumnCountPercentiles[i] = columnCountHist.percentile(offsetPercentiles[i]);
+ }
+
+ // min value
+ estimatedRowSizePercentiles[5] = rowSizeHist.min();
+ estimatedColumnCountPercentiles[5] = columnCountHist.min();
+ // max value
+ estimatedRowSizePercentiles[6] = rowSizeHist.max();
+ estimatedColumnCountPercentiles[6] = columnCountHist.max();
+
+ String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+ double[] readLatency = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "ReadLatency"));
+ double[] writeLatency = probe.metricPercentilesAsArray((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspace, cfname, "WriteLatency"));
+ double[] sstablesPerRead = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "SSTablesPerReadHistogram"));
+
+ System.out.println(format("%s/%s histograms", keyspace, cfname));
+ System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+ "Percentile", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
+ System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+ "", "", "(micros)", "(micros)", "(bytes)", ""));
+
+ for (int i = 0; i < percentiles.length; i++)
+ {
+ System.out.println(format("%-10s%10.2f%18.2f%18.2f%18.0f%18.0f",
+ percentiles[i],
+ sstablesPerRead[i],
+ writeLatency[i],
+ readLatency[i],
+ estimatedRowSizePercentiles[i],
+ estimatedColumnCountPercentiles[i]));
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "cleanup", description = "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces")
+ public static class Cleanup extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ if (Keyspace.SYSTEM_KS.equals(keyspace))
+ continue;
+
+ try
+ {
+ probe.forceKeyspaceCleanup(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during cleanup", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "clearsnapshot", description = "Remove the snapshot with the given name from the given keyspaces. If no snapshotName is specified we will remove all snapshots")
+ public static class ClearSnapshot extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspaces>...] ", description = "Remove snapshots from the given keyspaces")
+ private List<String> keyspaces = new ArrayList<>();
+
+ @Option(title = "snapshot_name", name = "-t", description = "Remove the snapshot with a given name")
+ private String snapshotName = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("Requested clearing snapshot(s) for ");
+
+ if (keyspaces.isEmpty())
+ sb.append("[all keyspaces]");
+ else
+ sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+ if (!snapshotName.isEmpty())
+ sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+ System.out.println(sb.toString());
+
+ try
+ {
+ probe.clearSnapshot(snapshotName, toArray(keyspaces, String.class));
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during clearing snapshots", e);
+ }
+ }
+ }
+
+ @Command(name = "compact", description = "Force a (major) compaction on one or more column families")
+ public static class Compact extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.forceKeyspaceCompaction(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "flush", description = "Flush one or more column families")
+ public static class Flush extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.forceKeyspaceFlush(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during flushing", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more column families")
+ public static class Scrub extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "disable_snapshot",
+ name = {"-ns", "--no-snapshot"},
+ description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
+ private boolean disableSnapshot = false;
+
+ @Option(title = "skip_corrupted",
+ name = {"-s", "--skip-corrupted"},
+ description = "Skip corrupted partitions even when scrubbing counter tables. (default false)")
+ private boolean skipCorrupted = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.scrub(disableSnapshot, skipCorrupted, keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during flushing", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "disableautocompaction", description = "Disable autocompaction for the given keyspace and column family")
+ public static class DisableAutoCompaction extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.disableAutoCompaction(keyspace, cfnames);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error occurred during disabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "enableautocompaction", description = "Enable autocompaction for the given keyspace and column family")
+ public static class EnableAutoCompaction extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.enableAutoCompaction(keyspace, cfnames);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "upgradesstables", description = "Rewrite sstables (for the requested column families) that are not on the current version (thus upgrading them to said current version)")
+ public static class UpgradeSSTable extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
+ private boolean includeAll = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.upgradeSSTables(keyspace, !includeAll, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "compactionstats", description = "Print statistics on compactions")
+ public static class CompactionStats extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ int compactionThroughput = probe.getCompactionThroughput();
+ CompactionManagerMBean cm = probe.getCompactionManagerProxy();
+ System.out.println("pending tasks: " + probe.getCompactionMetric("PendingTasks"));
+ if (cm.getCompactions().size() > 0)
+ System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
+ long remainingBytes = 0;
+ for (Map<String, String> c : cm.getCompactions())
+ {
+ String percentComplete = new Long(c.get("total")) == 0
+ ? "n/a"
+ : new DecimalFormat("0.00").format((double) new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%";
+ System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), c.get("unit"), percentComplete);
+ if (c.get("taskType").equals(OperationType.COMPACTION.toString()))
+ remainingBytes += (new Long(c.get("total")) - new Long(c.get("completed")));
+ }
+ long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes == 0
+ ? -1
+ : (remainingBytes) / (1024L * 1024L * compactionThroughput);
+ String remainingTime = remainingTimeInSecs < 0
+ ? "n/a"
+ : format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
+
+ System.out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
+ }
+ }
+
+ @Command(name = "compactionhistory", description = "Print history of compaction")
+ public static class CompactionHistory extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Compaction History: ");
+
+ TabularData tabularData = probe.getCompactionHistory();
+ if (tabularData.isEmpty())
+ {
+ System.out.printf("There is no compaction history");
+ return;
+ }
+
+ String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
+ List<String> indexNames = tabularData.getTabularType().getIndexNames();
+ System.out.printf(format, toArray(indexNames, Object.class));
+
+ Set<?> values = tabularData.keySet();
+ for (Object eachValue : values)
+ {
+ List<?> value = (List<?>) eachValue;
+ System.out.printf(format, toArray(value, Object.class));
+ }
+ }
+ }
+
+ @Command(name = "decommission", description = "Decommission the *node I am connecting to*")
+ public static class Decommission extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.decommission();
+ } catch (InterruptedException e)
+ {
+ throw new RuntimeException("Error decommissioning node", e);
+ }
+ }
+ }
+
+ @Command(name = "describecluster", description = "Print the name, snitch, partitioner and schema version of a cluster")
+ public static class DescribeCluster extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ // display cluster name, snitch and partitioner
+ System.out.println("Cluster Information:");
+ System.out.println("\tName: " + probe.getClusterName());
+ System.out.println("\tSnitch: " + probe.getEndpointSnitchInfoProxy().getSnitchName());
+ System.out.println("\tPartitioner: " + probe.getPartitioner());
+
+ // display schema version for each node
+ System.out.println("\tSchema versions:");
+ Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions();
+ for (String version : schemaVersions.keySet())
+ {
+ System.out.println(format("\t\t%s: %s%n", version, schemaVersions.get(version)));
+ }
+ }
+ }
+
+ @Command(name = "disablebinary", description = "Disable native transport (binary protocol)")
+ public static class DisableBinary extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.stopNativeTransport();
+ }
+ }
+
+ @Command(name = "enablebinary", description = "Reenable native transport (binary protocol)")
+ public static class EnableBinary extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startNativeTransport();
+ }
+ }
+
+ @Command(name = "enablegossip", description = "Reenable gossip")
+ public static class EnableGossip extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startGossiping();
+ }
+ }
+
+ @Command(name = "disablegossip", description = "Disable gossip (effectively marking the node down)")
+ public static class DisableGossip extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.stopGossiping();
+ }
+ }
+
+ @Command(name = "enablehandoff", description = "Reenable the future hints storing on the current node")
+ public static class EnableHandoff extends NodeToolCmd
+ {
++ @Arguments(usage = "<dc-name>,<dc-name>", description = "Enable hinted handoff only for these DCs")
++ private List<String> args = new ArrayList<>();
++
+ @Override
+ public void execute(NodeProbe probe)
+ {
- probe.enableHintedHandoff();
++ checkArgument(args.size() <= 1, "enablehandoff does not accept two args");
++ if(args.size() == 1)
++ probe.enableHintedHandoff(args.get(0));
++ else
++ probe.enableHintedHandoff();
+ }
+ }
+
+ @Command(name = "enablethrift", description = "Reenable thrift server")
+ public static class EnableThrift extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startThriftServer();
+ }
+ }
+
+ @Command(name = "getcompactionthreshold", description = "Print min and max compaction thresholds for a given column family")
+ public static class GetCompactionThreshold extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace with a column family")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "getcompactionthreshold requires ks and cf args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+
+ ColumnFamilyStoreMBean cfsProxy = probe.getCfsProxy(ks, cf);
+ System.out.println("Current compaction thresholds for " + ks + "/" + cf + ": \n" +
+ " min = " + cfsProxy.getMinimumCompactionThreshold() + ", " +
+ " max = " + cfsProxy.getMaximumCompactionThreshold());
+ }
+ }
+
+ @Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
+ public static class GetCompactionThroughput extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
+ }
+ }
+
+ @Command(name = "getstreamthroughput", description = "Print the MB/s throughput cap for streaming in the system")
+ public static class GetStreamThroughput extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Current stream throughput: " + probe.getStreamThroughput() + " MB/s");
+ }
+ }
+
+ @Command(name = "getendpoints", description = "Print the end points that owns the key")
+ public static class GetEndpoints extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key for which we need to find the endpoint")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "getendpoints requires ks, cf and key args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+ String key = args.get(2);
+
+ List<InetAddress> endpoints = probe.getEndpoints(ks, cf, key);
+ for (InetAddress endpoint : endpoints)
+ {
+ System.out.println(endpoint.getHostAddress());
+ }
+ }
+ }
+
+ @Command(name = "getsstables", description = "Print the sstable filenames that own the key")
+ public static class GetSSTables extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "getsstables requires ks, cf and key args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+ String key = args.get(2);
+
+ List<String> sstables = probe.getSSTables(ks, cf, key);
+ for (String sstable : sstables)
+ {
+ System.out.println(sstable);
+ }
+ }
+ }
+
+ @Command(name = "gossipinfo", description = "Shows the gossip information for the cluster")
+ public static class GossipInfo extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println(probe.getGossipInfo());
+ }
+ }
+
+ @Command(name = "invalidatekeycache", description = "Invalidate the key cache")
+ public static class InvalidateKeyCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateKeyCache();
+ }
+ }
+
+ @Command(name = "invalidaterowcache", description = "Invalidate the row cache")
+ public static class InvalidateRowCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateRowCache();
+ }
+ }
+
+ @Command(name = "invalidatecountercache", description = "Invalidate the counter cache")
+ public static class InvalidateCounterCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateCounterCache();
+ }
+ }
+
+ @Command(name = "taketoken", description = "Move the token(s) from the existing owner(s) to this node. For vnodes only. Use \\\\ to escape negative tokens.")
+ public static class TakeToken extends NodeToolCmd
+ {
+ @Arguments(usage = "<token, ...>", description = "Token(s) to take", required = true)
+ private List<String> tokens = new ArrayList<String>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.takeTokens(tokens);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error taking tokens", e);
+ }
+ }
+ }
+
+ @Command(name = "join", description = "Join the ring")
+ public static class Join extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkState(!probe.isJoined(), "This node has already joined the ring.");
+
+ try
+ {
+ probe.joinRing();
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during joining the ring", e);
+ }
+ }
+ }
+
+ @Command(name = "move", description = "Move node on the token ring to a new token")
+ public static class Move extends NodeToolCmd
+ {
+ @Arguments(usage = "<new token>", description = "The new token. (for negative tokens)", required = true)
+ private String newToken = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.move(newToken);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during moving node", e);
+ }
+ }
+ }
+
+
+
+ @Command(name = "pausehandoff", description = "Pause hints delivery process")
+ public static class PauseHandoff extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.pauseHintsDelivery();
+ }
+ }
+
+ @Command(name = "resumehandoff", description = "Resume hints delivery process")
+ public static class ResumeHandoff extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.resumeHintsDelivery();
+ }
+ }
+
+
+ @Command(name = "proxyhistograms", description = "Print statistic histograms for network operations")
+ public static class ProxyHistograms extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+ double[] readLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Read"));
+ double[] writeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Write"));
+ double[] rangeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("RangeSlice"));
+
+ System.out.println("proxy histograms");
+ System.out.println(format("%-10s%18s%18s%18s",
+ "Percentile", "Read Latency", "Write Latency", "Range Latency"));
+ System.out.println(format("%-10s%18s%18s%18s",
+ "", "(micros)", "(micros)", "(micros)"));
+ for (int i = 0; i < percentiles.length; i++)
+ {
+ System.out.println(format("%-10s%18.2f%18.2f%18.2f",
+ percentiles[i],
+ readLatency[i],
+ writeLatency[i],
+ rangeLatency[i]));
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "rebuild", description = "Rebuild data by streaming from other nodes (similarly to bootstrap)")
+ public static class Rebuild extends NodeToolCmd
+ {
+ @Arguments(usage = "<src-dc-name>", description = "Name of DC from which to select sources for streaming. By default, pick any DC")
+ private String sourceDataCenterName = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.rebuild(sourceDataCenterName);
+ }
+ }
+
+ @Command(name = "refresh", description = "Load newly placed SSTables to the system without restart")
+ public static class Refresh extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "refresh requires ks and cf args");
+ probe.loadNewSSTables(args.get(0), args.get(1));
+ }
+ }
+
+ @Deprecated
+ @Command(name = "removetoken", description = "DEPRECATED (see removenode)", hidden = true)
+ public static class RemoveToken extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.err.println("Warn: removetoken is deprecated, please use removenode instead");
+ }
+ }
+
+ @Command(name = "removenode", description = "Show status of current node removal, force completion of pending removal or remove provided ID")
+ public static class RemoveNode extends NodeToolCmd
+ {
+ @Arguments(title = "remove_operation", usage = "<status>|<force>|<ID>", description = "Show status of current node removal, force completion of pending removal, or remove provided ID", required = true)
+ private String removeOperation = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ switch (removeOperation)
+ {
+ case "status":
+ System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+ break;
+ case "force":
+ System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+ probe.forceRemoveCompletion();
+ break;
+ default:
+ probe.removeNode(removeOperation);
+ break;
+ }
+ }
+ }
+
+ @Command(name = "repair", description = "Repair one or more column families")
+ public static class Repair extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "parallel", name = {"-par", "--parallel"}, description = "Use -par to carry out a parallel repair")
+ private boolean parallel = false;
+
+ @Option(title = "local_dc", name = {"-local", "--in-local-dc"}, description = "Use -local to only repair against nodes in the same datacenter")
+ private boolean localDC = false;
+
+ @Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
+ private List<String> specificDataCenters = new ArrayList<>();
+
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = EMPTY;
+
+ @Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
+ private boolean primaryRange = false;
+
+ @Option(title = "incremental_repair", name = {"-inc", "--incremental"}, description = "Use -inc to use the new incremental repair")
+ private boolean incrementalRepair = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ Collection<String> dataCenters = null;
+ if (!specificDataCenters.isEmpty())
+ dataCenters = newArrayList(specificDataCenters);
+ else if (localDC)
+ dataCenters = newArrayList(probe.getDataCenter());
+ if (!startToken.isEmpty() || !endToken.isEmpty())
+ probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken, !incrementalRepair);
+ else
+ probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, !incrementalRepair, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during repair", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "setcachecapacity", description = "Set global key, row, and counter cache capacities (in MB units)")
+ public static class SetCacheCapacity extends NodeToolCmd
+ {
+ @Arguments(title = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+ usage = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+ description = "Key cache, row cache, and counter cache (in MB)",
+ required = true)
+ private List<Integer> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "setcachecapacity requires key-cache-capacity, row-cache-capacity, and counter-cache-capacity args.");
+ probe.setCacheCapacities(args.get(0), args.get(1), args.get(2));
+ }
+ }
+
+ @Command(name = "setcompactionthreshold", description = "Set min and max compaction thresholds for a given column family")
+ public static class SetCompactionThreshold extends NodeToolCmd
+ {
+ @Arguments(title = "<keyspace> <cfname> <minthreshold> <maxthreshold>", usage = "<keyspace> <cfname> <minthreshold> <maxthreshold>", description = "The keyspace, the column family, min and max threshold", required = true)
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 4, "setcompactionthreshold requires ks, cf, min, and max threshold args.");
+
+ int minthreshold = parseInt(args.get(2));
+ int maxthreshold = parseInt(args.get(3));
+ checkArgument(minthreshold >= 0 && maxthreshold >= 0, "Thresholds must be positive integers");
+ checkArgument(minthreshold <= maxthreshold, "Min threshold cannot be greater than max.");
+ checkArgument(minthreshold >= 2 || maxthreshold == 0, "Min threshold must be at least 2");
+
+ probe.setCompactionThreshold(args.get(0), args.get(1), minthreshold, maxthreshold);
+ }
+ }
+
+ @Command(name = "setcompactionthroughput", description = "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling")
+ public static class SetCompactionThroughput extends NodeToolCmd
+ {
+ @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+ private Integer compactionThroughput = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.setCompactionThroughput(compactionThroughput);
+ }
+ }
+
+ @Command(name = "setstreamthroughput", description = "Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling")
+ public static class SetStreamThroughput extends NodeToolCmd
+ {
+ @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+ private Integer streamThroughput = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.setStreamThroughput(streamThroughput);
+ }
+ }
+
+ @Command(name = "settraceprobability", description = "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default")
+ public static class SetTraceProbability extends NodeToolCmd
+ {
+ @Arguments(title = "trace_probability", usage = "<value>", description = "Trace probability between 0 and 1 (ex: 0.2)", required = true)
+ private Double traceProbability = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(traceProbability >= 0 && traceProbability <= 1, "Trace probability must be between 0 and 1");
+ probe.setTraceProbability(traceProbability);
+ }
+ }
+
+ @Command(name = "snapshot", description = "Take a snapshot of specified keyspaces or a snapshot of the specified column family")
+ public static class Snapshot extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspaces...>]", description = "List of keyspaces. By default, all keyspaces")
+ private List<String> keyspaces = new ArrayList<>();
+
+ @Option(title = "cfname", name = {"-cf", "--column-family"}, description = "The column family name (you must specify one and only one keyspace for using this option)")
+ private String columnFamily = null;
+
+ @Option(title = "tag", name = {"-t", "--tag"}, description = "The name of the snapshot")
+ private String snapshotName = Long.toString(System.currentTimeMillis());
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("Requested creating snapshot(s) for ");
+
+ if (keyspaces.isEmpty())
+ sb.append("[all keyspaces]");
+ else
+ sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+ if (!snapshotName.isEmpty())
+ sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+ System.out.println(sb.toString());
+
+ probe.takeSnapshot(snapshotName, columnFamily, toArray(keyspaces, String.class));
+ System.out.println("Snapshot directory: " + snapshotName);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during taking a snapshot", e);
+ }
+ }
+ }
+
+ @Command(name = "listsnapshots", description = "Lists all the snapshots along with the size on disk and true size.")
+ public static class ListSnapshots extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ System.out.println("Snapshot Details: ");
+
+ final Map<String,TabularData> snapshotDetails = probe.getSnapshotDetails();
+ if (snapshotDetails.isEmpty())
+ {
+ System.out.printf("There are no snapshots");
+ return;
+ }
+
+ final long trueSnapshotsSize = probe.trueSnapshotsSize();
+ final String format = "%-20s%-29s%-29s%-19s%-19s%n";
+ // display column names only once
+ final List<String> indexNames = snapshotDetails.entrySet().iterator().next().getValue().getTabularType().getIndexNames();
+ System.out.printf(format, (Object[]) indexNames.toArray(new String[indexNames.size()]));
+
+ for (final Map.Entry<String, TabularData> snapshotDetail : snapshotDetails.entrySet())
+ {
+ Set<?> values = snapshotDetail.getValue().keySet();
+ for (Object eachValue : values)
+ {
+ final List<?> value = (List<?>) eachValue;
+ System.out.printf(format, value.toArray(new Object[value.size()]));
+ }
+ }
+
+ System.out.println("\nTotal TrueDiskSpaceUsed: " + FileUtils.stringifyFileSize(trueSnapshotsSize) + "\n");
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error during list snapshot", e);
+ }
+ }
+ }
+
+ @Command(name = "status", description = "Print cluster information (state, load, IDs, ...)")
+ public static class Status extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace>]", description = "The keyspace name")
+ private String keyspace = null;
+
+ @Option(title = "resolve_ip", name = {"-r", "--resolve-ip"}, description = "Show node domain names instead of IPs")
+ private boolean resolveIp = false;
+
+ private boolean hasEffectiveOwns = false;
+ private boolean isTokenPerNode = true;
+ private int maxAddressLength = 0;
+ private String format = null;
+ private Collection<String> joiningNodes, leavingNodes, movingNodes, liveNodes, unreachableNodes;
+ private Map<String, String> loadMap, hostIDMap, tokensToEndpoints;
+ private EndpointSnitchInfoMBean epSnitchInfo;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ joiningNodes = probe.getJoiningNodes();
+ leavingNodes = probe.getLeavingNodes();
+ movingNodes = probe.getMovingNodes();
+ loadMap = probe.getLoadMap();
+ tokensToEndpoints = probe.getTokenToEndpointMap();
+ liveNodes = probe.getLiveNodes();
+ unreachableNodes = probe.getUnreachableNodes();
+ hostIDMap = probe.getHostIdMap();
+ epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+ SetHostStat ownerships;
+ try
+ {
+ ownerships = new SetHostStat(probe.effectiveOwnership(keyspace));
+ hasEffectiveOwns = true;
+ } catch (IllegalStateException e)
+ {
+ ownerships = new SetHostStat(probe.getOwnership());
+ }
+
+ // More tokens then nodes (aka vnodes)?
+ if (new HashSet<>(tokensToEndpoints.values()).size() < tokensToEndpoints.keySet().size())
+ isTokenPerNode = false;
+
+ Map<String, SetHostStat> dcs = getOwnershipByDc(probe, ownerships);
+
+ findMaxAddressLength(dcs);
+
+ // Datacenters
+ for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+ {
+ String dcHeader = String.format("Datacenter: %s%n", dc.getKey());
+ System.out.printf(dcHeader);
+ for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('=');
+ System.out.println();
+
+ // Legend
+ System.out.println("Status=Up/Down");
+ System.out.println("|/ State=Normal/Leaving/Joining/Moving");
+
+ printNodesHeader(hasEffectiveOwns, isTokenPerNode);
+
+ // Nodes
+ for (HostStat entry : dc.getValue())
+ printNode(probe, entry, hasEffectiveOwns, isTokenPerNode);
+ }
+
+ }
+
+ private void findMaxAddressLength(Map<String, SetHostStat> dcs)
+ {
+ maxAddressLength = 0;
+ for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+ {
+ for (HostStat stat : dc.getValue())
+ {
+ maxAddressLength = Math.max(maxAddressLength, stat.ipOrDns().length());
+ }
+ }
+ }
+
+ private void printNodesHeader(boolean hasEffectiveOwns, boolean isTokenPerNode)
+ {
+ String fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+ String owns = hasEffectiveOwns ? "Owns (effective)" : "Owns";
+
+ if (isTokenPerNode)
+ System.out.printf(fmt, "-", "-", "Address", "Load", owns, "Host ID", "Token", "Rack");
+ else
+ System.out.printf(fmt, "-", "-", "Address", "Load", "Tokens", owns, "Host ID", "Rack");
+ }
+
+ private void printNode(NodeProbe probe, HostStat hostStat, boolean hasEffectiveOwns, boolean isTokenPerNode)
+ {
+ String status, state, load, strOwns, hostID, rack, fmt;
+ fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+ String endpoint = hostStat.ip;
+ if (liveNodes.contains(endpoint)) status = "U";
+ else if (unreachableNodes.contains(endpoint)) status = "D";
+ else status = "?";
+ if (joiningNodes.contains(endpoint)) state = "J";
+ else if (leavingNodes.contains(endpoint)) state = "L";
+ else if (movingNodes.contains(endpoint)) state = "M";
+ else state = "N";
+
+ load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?";
+ strOwns = new DecimalFormat("##0.0%").format(hostStat.owns);
+ hostID = hostIDMap.get(endpoint);
+
+ try
+ {
+ rack = epSnitchInfo.getRack(endpoint);
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (isTokenPerNode)
+ {
+ System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, strOwns, hostID, probe.getTokens(endpoint).get(0), rack);
+ } else
+ {
+ int tokens = probe.getTokens(endpoint).size();
+ System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, tokens, strOwns, hostID, rack);
+ }
+ }
+
+ private String getFormat(
+ boolean hasEffectiveOwns,
+ boolean isTokenPerNode)
+ {
+ if (format == null)
+ {
+ StringBuilder buf = new StringBuilder();
+ String addressPlaceholder = String.format("%%-%ds ", maxAddressLength);
+ buf.append("%s%s "); // status
+ buf.append(addressPlaceholder); // address
+ buf.append("%-9s "); // load
+ if (!isTokenPerNode)
+ buf.append("%-6s "); // "Tokens"
+ if (hasEffectiveOwns)
+ buf.append("%-16s "); // "Owns (effective)"
+ else
+ buf.append("%-6s "); // "Owns
+ buf.append("%-36s "); // Host ID
+ if (isTokenPerNode)
+ buf.append("%-39s "); // token
+ buf.append("%s%n"); // "Rack"
+
+ format = buf.toString();
+ }
+
+ return format;
+ }
+
+ private Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, SetHostStat ownerships)
+ {
+ Map<String, SetHostStat> ownershipByDc = Maps.newLinkedHashMap();
+ EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+ try
+ {
+ for (HostStat ownership : ownerships)
+ {
+ String dc = epSnitchInfo.getDatacenter(ownership.ip);
+ if (!ownershipByDc.containsKey(dc))
+ ownershipByDc.put(dc, new SetHostStat());
+ ownershipByDc.get(dc).add(ownership);
+ }
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return ownershipByDc;
+ }
+
+ class SetHostStat implements Iterable<HostStat>
+ {
+ final List<HostStat> hostStats = new ArrayList<>();
+
+ public SetHostStat()
+ {
+ }
+
+ public SetHostStat(Map<InetAddress, Float> ownerships)
+ {
+ for (Map.Entry<InetAddress, Float> entry : ownerships.entrySet())
+ {
+ hostStats.add(new HostStat(entry));
+ }
+ }
+
+ @Override
<TRUNCATED>
[4/8] Merge branch 'cassandra-2.0' into cassandra-2.1
Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f628bd8a/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeTool.java
index 94bce74,0000000..453491b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@@ -1,2248 -1,0 +1,2255 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.MemoryUsage;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+
+import com.yammer.metrics.reporting.JmxReporter;
+import io.airlift.command.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManagerMBean;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.getStackTraceAsString;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static org.apache.commons.lang3.ArrayUtils.EMPTY_STRING_ARRAY;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.join;
+
+public class NodeTool
+{
+ private static final String HISTORYFILE = "nodetool.history";
+
+ public static void main(String... args)
+ {
+ List<Class<? extends Runnable>> commands = newArrayList(
+ Help.class,
+ Info.class,
+ Ring.class,
+ NetStats.class,
+ CfStats.class,
+ CfHistograms.class,
+ Cleanup.class,
+ ClearSnapshot.class,
+ Compact.class,
+ Scrub.class,
+ Flush.class,
+ UpgradeSSTable.class,
+ DisableAutoCompaction.class,
+ EnableAutoCompaction.class,
+ CompactionStats.class,
+ CompactionHistory.class,
+ Decommission.class,
+ DescribeCluster.class,
+ DisableBinary.class,
+ EnableBinary.class,
+ EnableGossip.class,
+ DisableGossip.class,
+ EnableHandoff.class,
+ EnableThrift.class,
+ GetCompactionThreshold.class,
+ GetCompactionThroughput.class,
+ GetStreamThroughput.class,
+ GetEndpoints.class,
+ GetSSTables.class,
+ GossipInfo.class,
+ InvalidateKeyCache.class,
+ InvalidateRowCache.class,
+ InvalidateCounterCache.class,
+ Join.class,
+ Move.class,
+ PauseHandoff.class,
+ ResumeHandoff.class,
+ ProxyHistograms.class,
+ Rebuild.class,
+ Refresh.class,
+ RemoveToken.class,
+ RemoveNode.class,
+ Repair.class,
+ SetCacheCapacity.class,
+ SetCompactionThreshold.class,
+ SetCompactionThroughput.class,
+ SetStreamThroughput.class,
+ SetTraceProbability.class,
+ Snapshot.class,
+ ListSnapshots.class,
+ Status.class,
+ StatusBinary.class,
+ StatusThrift.class,
+ Stop.class,
+ StopDaemon.class,
+ Version.class,
+ DescribeRing.class,
+ RebuildIndex.class,
+ RangeKeySample.class,
+ EnableBackup.class,
+ DisableBackup.class,
+ ResetLocalSchema.class,
+ ReloadTriggers.class,
+ SetCacheKeysToSave.class,
+ DisableThrift.class,
+ DisableHandoff.class,
+ Drain.class,
+ TruncateHints.class,
+ TpStats.class,
+ TakeToken.class
+ );
+
+ Cli<Runnable> parser = Cli.<Runnable>builder("nodetool")
+ .withDescription("Manage your Cassandra cluster")
+ .withDefaultCommand(Help.class)
+ .withCommands(commands)
+ .build();
+
+ int status = 0;
+ try
+ {
+ Runnable parse = parser.parse(args);
+ printHistory(args);
+ parse.run();
+ } catch (IllegalArgumentException |
+ IllegalStateException |
+ ParseArgumentsMissingException |
+ ParseArgumentsUnexpectedException |
+ ParseOptionConversionException |
+ ParseOptionMissingException |
+ ParseOptionMissingValueException |
+ ParseCommandMissingException |
+ ParseCommandUnrecognizedException e)
+ {
+ badUse(e);
+ status = 1;
+ } catch (Throwable throwable)
+ {
+ err(Throwables.getRootCause(throwable));
+ status = 2;
+ }
+
+ System.exit(status);
+ }
+
+ private static void printHistory(String... args)
+ {
+ //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
+ if (args.length == 0)
+ return;
+
+ String cmdLine = Joiner.on(" ").skipNulls().join(args);
+ cmdLine = cmdLine.replaceFirst("(?<=(-pw|--password))\\s+\\S+", " <hidden>");
+
+ try (FileWriter writer = new FileWriter(new File(FBUtilities.getToolsOutputDirectory(), HISTORYFILE), true))
+ {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ writer.append(sdf.format(new Date())).append(": ").append(cmdLine).append(System.lineSeparator());
+ }
+ catch (IOException ioe)
+ {
+ //quietly ignore any errors about not being able to write out history
+ }
+ }
+
+ private static void badUse(Exception e)
+ {
+ System.out.println("nodetool: " + e.getMessage());
+ System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
+ }
+
+ private static void err(Throwable e)
+ {
+ System.err.println("error: " + e.getMessage());
+ System.err.println("-- StackTrace --");
+ System.err.println(getStackTraceAsString(e));
+ }
+
+ public static abstract class NodeToolCmd implements Runnable
+ {
+
+ @Option(type = OptionType.GLOBAL, name = {"-h", "--host"}, description = "Node hostname or ip address")
+ private String host = "127.0.0.1";
+
+ @Option(type = OptionType.GLOBAL, name = {"-p", "--port"}, description = "Remote jmx agent port number")
+ private String port = "7199";
+
+ @Option(type = OptionType.GLOBAL, name = {"-u", "--username"}, description = "Remote jmx agent username")
+ private String username = EMPTY;
+
+ @Option(type = OptionType.GLOBAL, name = {"-pw", "--password"}, description = "Remote jmx agent password")
+ private String password = EMPTY;
+
+ @Override
+ public void run()
+ {
+ try (NodeProbe probe = connect())
+ {
+ execute(probe);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error while closing JMX connection", e);
+ }
+
+ }
+
+ protected abstract void execute(NodeProbe probe);
+
+ private NodeProbe connect()
+ {
+ NodeProbe nodeClient = null;
+
+ try
+ {
+ if (username.isEmpty())
+ nodeClient = new NodeProbe(host, parseInt(port));
+ else
+ nodeClient = new NodeProbe(host, parseInt(port), username, password);
+ } catch (IOException e)
+ {
+ Throwable rootCause = Throwables.getRootCause(e);
+ System.err.println(format("nodetool: Failed to connect to '%s:%s' - %s: '%s'.", host, port, rootCause.getClass().getSimpleName(), rootCause.getMessage()));
+ System.exit(1);
+ }
+
+ return nodeClient;
+ }
+
+ protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe)
+ {
+ List<String> keyspaces = new ArrayList<>();
+
+ if (cmdArgs == null || cmdArgs.isEmpty())
+ keyspaces.addAll(nodeProbe.getKeyspaces());
+ else
+ keyspaces.add(cmdArgs.get(0));
+
+ for (String keyspace : keyspaces)
+ {
+ if (!nodeProbe.getKeyspaces().contains(keyspace))
+ throw new IllegalArgumentException("Keyspace [" + keyspace + "] does not exist.");
+ }
+
+ return Collections.unmodifiableList(keyspaces);
+ }
+
+ protected String[] parseOptionalColumnFamilies(List<String> cmdArgs)
+ {
+ return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
+ }
+ }
+
+ @Command(name = "info", description = "Print node information (uptime, load, ...)")
+ public static class Info extends NodeToolCmd
+ {
+ @Option(name = {"-T", "--tokens"}, description = "Display all tokens")
+ private boolean tokens = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ boolean gossipInitialized = probe.isInitialized();
+
+ System.out.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
+ System.out.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
+ System.out.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
+ System.out.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
+ System.out.printf("%-17s: %s%n", "Load", probe.getLoadString());
+ if (gossipInitialized)
+ System.out.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
+ else
+ System.out.printf("%-17s: %s%n", "Generation No", 0);
+
+ // Uptime
+ long secondsUp = probe.getUptime() / 1000;
+ System.out.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
+
+ // Memory usage
+ MemoryUsage heapUsage = probe.getHeapMemoryUsage();
+ double memUsed = (double) heapUsage.getUsed() / (1024 * 1024);
+ double memMax = (double) heapUsage.getMax() / (1024 * 1024);
+ System.out.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
+
+ // Data Center/Rack
+ System.out.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
+ System.out.printf("%-17s: %s%n", "Rack", probe.getRack());
+
+ // Exceptions
+ System.out.printf("%-17s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
+
+ CacheServiceMBean cacheService = probe.getCacheServiceMBean();
+
+ // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Key Cache",
+ probe.getCacheMetric("KeyCache", "Entries"),
+ probe.getCacheMetric("KeyCache", "Size"),
+ probe.getCacheMetric("KeyCache", "Capacity"),
+ probe.getCacheMetric("KeyCache", "Hits"),
+ probe.getCacheMetric("KeyCache", "Requests"),
+ probe.getCacheMetric("KeyCache", "HitRate"),
+ cacheService.getKeyCacheSavePeriodInSeconds());
+
+ // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Row Cache",
+ probe.getCacheMetric("RowCache", "Entries"),
+ probe.getCacheMetric("RowCache", "Size"),
+ probe.getCacheMetric("RowCache", "Capacity"),
+ probe.getCacheMetric("RowCache", "Hits"),
+ probe.getCacheMetric("RowCache", "Requests"),
+ probe.getCacheMetric("RowCache", "HitRate"),
+ cacheService.getRowCacheSavePeriodInSeconds());
+
+ // Counter Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
+ System.out.printf("%-17s: entries %d, size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+ "Counter Cache",
+ probe.getCacheMetric("CounterCache", "Entries"),
+ probe.getCacheMetric("CounterCache", "Size"),
+ probe.getCacheMetric("CounterCache", "Capacity"),
+ probe.getCacheMetric("CounterCache", "Hits"),
+ probe.getCacheMetric("CounterCache", "Requests"),
+ probe.getCacheMetric("CounterCache", "HitRate"),
+ cacheService.getCounterCacheSavePeriodInSeconds());
+
+ // Tokens
+ List<String> tokens = probe.getTokens();
+ if (tokens.size() == 1 || this.tokens)
+ for (String token : tokens)
+ System.out.printf("%-17s: %s%n", "Token", token);
+ else
+ System.out.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", tokens.size());
+ }
+ }
+
+ @Command(name = "ring", description = "Print information about the token ring")
+ public static class Ring extends NodeToolCmd
+ {
+ @Arguments(description = "Specify a keyspace for accurate ownership information (topology awareness)")
+ private String keyspace = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
+ LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
+ for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
+ endpointsToTokens.put(entry.getValue(), entry.getKey());
+
+ int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
+ {
+ @Override
+ public int compare(String first, String second)
+ {
+ return ((Integer) first.length()).compareTo(second.length());
+ }
+ }).length();
+
+ String formatPlaceholder = "%%-%ds %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
+ String format = format(formatPlaceholder, maxAddressLength);
+
+ // Calculate per-token ownership of the ring
+ Map<InetAddress, Float> ownerships;
+ try
+ {
+ ownerships = probe.effectiveOwnership(keyspace);
+ } catch (IllegalStateException ex)
+ {
+ ownerships = probe.getOwnership();
+ System.out.printf("Note: Ownership information does not include topology; for complete information, specify a keyspace%n");
+ }
+ try
+ {
+ System.out.println();
+ Map<String, Map<InetAddress, Float>> perDcOwnerships = Maps.newLinkedHashMap();
+ // get the different datasets and map to tokens
+ for (Map.Entry<InetAddress, Float> ownership : ownerships.entrySet())
+ {
+ String dc = probe.getEndpointSnitchInfoProxy().getDatacenter(ownership.getKey().getHostAddress());
+ if (!perDcOwnerships.containsKey(dc))
+ perDcOwnerships.put(dc, new LinkedHashMap<InetAddress, Float>());
+ perDcOwnerships.get(dc).put(ownership.getKey(), ownership.getValue());
+ }
+ for (Map.Entry<String, Map<InetAddress, Float>> entry : perDcOwnerships.entrySet())
+ printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue());
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (DatabaseDescriptor.getNumTokens() > 1)
+ {
+ System.out.println(" Warning: \"nodetool ring\" is used to output all the tokens of a node.");
+ System.out.println(" To view status related info of a node use \"nodetool status\" instead.\n");
+ }
+ }
+
+ private void printDc(NodeProbe probe, String format,
+ String dc,
+ LinkedHashMultimap<String, String> endpointsToTokens,
+ Map<InetAddress, Float> filteredOwnerships)
+ {
+ Collection<String> liveNodes = probe.getLiveNodes();
+ Collection<String> deadNodes = probe.getUnreachableNodes();
+ Collection<String> joiningNodes = probe.getJoiningNodes();
+ Collection<String> leavingNodes = probe.getLeavingNodes();
+ Collection<String> movingNodes = probe.getMovingNodes();
+ Map<String, String> loadMap = probe.getLoadMap();
+
+ System.out.println("Datacenter: " + dc);
+ System.out.println("==========");
+
+ // get the total amount of replicas for this dc and the last token in this dc's ring
+ List<String> tokens = new ArrayList<>();
+ String lastToken = "";
+
+ for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+ {
+ tokens.addAll(endpointsToTokens.get(entry.getKey().getHostAddress()));
+ lastToken = tokens.get(tokens.size() - 1);
+ }
+
+
+ System.out.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token");
+
+ if (filteredOwnerships.size() > 1)
+ System.out.printf(format, "", "", "", "", "", "", lastToken);
+ else
+ System.out.println();
+
+ for (Map.Entry<InetAddress, Float> entry : filteredOwnerships.entrySet())
+ {
+ String endpoint = entry.getKey().getHostAddress();
+ for (String token : endpointsToTokens.get(endpoint))
+ {
+ String rack;
+ try
+ {
+ rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
+ } catch (UnknownHostException e)
+ {
+ rack = "Unknown";
+ }
+
+ String status = liveNodes.contains(endpoint)
+ ? "Up"
+ : deadNodes.contains(endpoint)
+ ? "Down"
+ : "?";
+
+ String state = "Normal";
+
+ if (joiningNodes.contains(endpoint))
+ state = "Joining";
+ else if (leavingNodes.contains(endpoint))
+ state = "Leaving";
+ else if (movingNodes.contains(endpoint))
+ state = "Moving";
+
+ String load = loadMap.containsKey(endpoint)
+ ? loadMap.get(endpoint)
+ : "?";
+ String owns = new DecimalFormat("##0.00%").format(entry.getValue());
+ System.out.printf(format, endpoint, rack, status, state, load, owns, token);
+ }
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "netstats", description = "Print network information on provided host (connecting node by default)")
+ public static class NetStats extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.printf("Mode: %s%n", probe.getOperationMode());
+ Set<StreamState> statuses = probe.getStreamStatus();
+ if (statuses.isEmpty())
+ System.out.println("Not sending any streams.");
+ for (StreamState status : statuses)
+ {
+ System.out.printf("%s %s%n", status.description, status.planId.toString());
+ for (SessionInfo info : status.sessions)
+ {
+ System.out.printf(" %s%n", info.peer.toString());
+ if (!info.receivingSummaries.isEmpty())
+ {
+ System.out.printf(" Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
+ for (ProgressInfo progress : info.getReceivingFiles())
+ {
+ System.out.printf(" %s%n", progress.toString());
+ }
+ }
+ if (!info.sendingSummaries.isEmpty())
+ {
+ System.out.printf(" Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
+ for (ProgressInfo progress : info.getSendingFiles())
+ {
+ System.out.printf(" %s%n", progress.toString());
+ }
+ }
+ }
+ }
+
+ System.out.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
+
+ MessagingServiceMBean ms = probe.msProxy;
+ System.out.printf("%-25s", "Pool Name");
+ System.out.printf("%10s", "Active");
+ System.out.printf("%10s", "Pending");
+ System.out.printf("%15s%n", "Completed");
+
+ int pending;
+ long completed;
+
+ pending = 0;
+ for (int n : ms.getCommandPendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getCommandCompletedTasks().values())
+ completed += n;
+ System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed);
+
+ pending = 0;
+ for (int n : ms.getResponsePendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getResponseCompletedTasks().values())
+ completed += n;
+ System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed);
+ }
+ }
+
+ @Command(name = "cfstats", description = "Print statistics on column families")
+ public static class CfStats extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace.cfname>...]", description = "List of column families (or keyspace) names")
+ private List<String> cfnames = new ArrayList<>();
+
+ @Option(name = "-i", description = "Ignore the list of column families and display the remaining cfs")
+ private boolean ignore = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ OptionFilter filter = new OptionFilter(ignore, cfnames);
+ Map<String, List<ColumnFamilyStoreMBean>> cfstoreMap = new HashMap<>();
+
+ // get a list of column family stores
+ Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
+
+ while (cfamilies.hasNext())
+ {
+ Map.Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
+ String keyspaceName = entry.getKey();
+ ColumnFamilyStoreMBean cfsProxy = entry.getValue();
+
+ if (!cfstoreMap.containsKey(keyspaceName) && filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+ {
+ List<ColumnFamilyStoreMBean> columnFamilies = new ArrayList<>();
+ columnFamilies.add(cfsProxy);
+ cfstoreMap.put(keyspaceName, columnFamilies);
+ } else if (filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
+ {
+ cfstoreMap.get(keyspaceName).add(cfsProxy);
+ }
+ }
+
+ // make sure all specified kss and cfs exist
+ filter.verifyKeyspaces(probe.getKeyspaces());
+ filter.verifyColumnFamilies();
+
+ // print out the table statistics
+ for (Map.Entry<String, List<ColumnFamilyStoreMBean>> entry : cfstoreMap.entrySet())
+ {
+ String keyspaceName = entry.getKey();
+ List<ColumnFamilyStoreMBean> columnFamilies = entry.getValue();
+ long keyspaceReadCount = 0;
+ long keyspaceWriteCount = 0;
+ int keyspacePendingTasks = 0;
+ double keyspaceTotalReadTime = 0.0f;
+ double keyspaceTotalWriteTime = 0.0f;
+
+ System.out.println("Keyspace: " + keyspaceName);
+ for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+ {
+ String cfName = cfstore.getColumnFamilyName();
+ long writeCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount();
+ long readCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount();
+
+ if (readCount > 0)
+ {
+ keyspaceReadCount += readCount;
+ keyspaceTotalReadTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadTotalLatency");
+ }
+ if (writeCount > 0)
+ {
+ keyspaceWriteCount += writeCount;
+ keyspaceTotalWriteTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteTotalLatency");
+ }
+ keyspacePendingTasks += (int) probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks");
+ }
+
+ double keyspaceReadLatency = keyspaceReadCount > 0
+ ? keyspaceTotalReadTime / keyspaceReadCount / 1000
+ : Double.NaN;
+ double keyspaceWriteLatency = keyspaceWriteCount > 0
+ ? keyspaceTotalWriteTime / keyspaceWriteCount / 1000
+ : Double.NaN;
+
+ System.out.println("\tRead Count: " + keyspaceReadCount);
+ System.out.println("\tRead Latency: " + format("%s", keyspaceReadLatency) + " ms.");
+ System.out.println("\tWrite Count: " + keyspaceWriteCount);
+ System.out.println("\tWrite Latency: " + format("%s", keyspaceWriteLatency) + " ms.");
+ System.out.println("\tPending Tasks: " + keyspacePendingTasks);
+
+ // print out column family statistics for this keyspace
+ for (ColumnFamilyStoreMBean cfstore : columnFamilies)
+ {
+ String cfName = cfstore.getColumnFamilyName();
+ if (cfName.contains("."))
+ System.out.println("\t\tTable (index): " + cfName);
+ else
+ System.out.println("\t\tTable: " + cfName);
+
+ System.out.println("\t\tSSTable count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveSSTableCount"));
+
+ int[] leveledSStables = cfstore.getSSTableCountPerLevel();
+ if (leveledSStables != null)
+ {
+ System.out.print("\t\tSSTables in each level: [");
+ for (int level = 0; level < leveledSStables.length; level++)
+ {
+ int count = leveledSStables[level];
+ System.out.print(count);
+ long maxCount = 4L; // for L0
+ if (level > 0)
+ maxCount = (long) Math.pow(10, level);
+ // show max threshold for level when exceeded
+ if (count > maxCount)
+ System.out.print("/" + maxCount);
+
+ if (level < leveledSStables.length - 1)
+ System.out.print(", ");
+ else
+ System.out.println("]");
+ }
+ }
+ System.out.println("\t\tSpace used (live), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveDiskSpaceUsed"));
+ System.out.println("\t\tSpace used (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "TotalDiskSpaceUsed"));
+ System.out.println("\t\tSpace used by snapshots (total), bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "SnapshotsSize"));
+ System.out.println("\t\tSSTable Compression Ratio: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionRatio"));
+ System.out.println("\t\tMemtable cell count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableColumnsCount"));
+ System.out.println("\t\tMemtable data size, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableDataSize"));
+ System.out.println("\t\tMemtable switch count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableSwitchCount"));
+ System.out.println("\t\tLocal read count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount());
+ double localReadLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getMean() / 1000;
+ double localRLatency = localReadLatency > 0 ? localReadLatency : Double.NaN;
+ System.out.printf("\t\tLocal read latency: %01.3f ms%n", localRLatency);
+ System.out.println("\t\tLocal write count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount());
+ double localWriteLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getMean() / 1000;
+ double localWLatency = localWriteLatency > 0 ? localWriteLatency : Double.NaN;
+ System.out.printf("\t\tLocal write latency: %01.3f ms%n", localWLatency);
+ System.out.println("\t\tPending tasks: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingTasks"));
+ System.out.println("\t\tBloom filter false positives: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterFalsePositives"));
+ System.out.println("\t\tBloom filter false ratio: " + format("%01.5f", probe.getColumnFamilyMetric(keyspaceName, cfName, "RecentBloomFilterFalseRatio")));
+ System.out.println("\t\tBloom filter space used, bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterDiskSpaceUsed"));
+ System.out.println("\t\tCompacted partition minimum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MinRowSize"));
+ System.out.println("\t\tCompacted partition maximum bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MaxRowSize"));
+ System.out.println("\t\tCompacted partition mean bytes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MeanRowSize"));
+ System.out.println("\t\tAverage live cells per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveScannedHistogram")).getMean());
+ System.out.println("\t\tAverage tombstones per slice (last five minutes): " + ((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "TombstoneScannedHistogram")).getMean());
+
+ System.out.println("");
+ }
+ System.out.println("----------------");
+ }
+ }
+
+ /**
+ * Used for filtering keyspaces and columnfamilies to be displayed using the cfstats command.
+ */
+ private static class OptionFilter
+ {
+ private Map<String, List<String>> filter = new HashMap<>();
+ private Map<String, List<String>> verifier = new HashMap<>();
+ private List<String> filterList = new ArrayList<>();
+ private boolean ignoreMode;
+
+ public OptionFilter(boolean ignoreMode, List<String> filterList)
+ {
+ this.filterList.addAll(filterList);
+ this.ignoreMode = ignoreMode;
+
+ for (String s : filterList)
+ {
+ String[] keyValues = s.split("\\.", 2);
+
+ // build the map that stores the ks' and cfs to use
+ if (!filter.containsKey(keyValues[0]))
+ {
+ filter.put(keyValues[0], new ArrayList<String>());
+ verifier.put(keyValues[0], new ArrayList<String>());
+
+ if (keyValues.length == 2)
+ {
+ filter.get(keyValues[0]).add(keyValues[1]);
+ verifier.get(keyValues[0]).add(keyValues[1]);
+ }
+ } else
+ {
+ if (keyValues.length == 2)
+ {
+ filter.get(keyValues[0]).add(keyValues[1]);
+ verifier.get(keyValues[0]).add(keyValues[1]);
+ }
+ }
+ }
+ }
+
+ public boolean isColumnFamilyIncluded(String keyspace, String columnFamily)
+ {
+ // supplying empty params list is treated as wanting to display all kss & cfs
+ if (filterList.isEmpty())
+ return !ignoreMode;
+
+ List<String> cfs = filter.get(keyspace);
+
+ // no such keyspace is in the map
+ if (cfs == null)
+ return ignoreMode;
+ // only a keyspace with no cfs was supplied
+ // so ignore or include (based on the flag) every column family in specified keyspace
+ else if (cfs.size() == 0)
+ return !ignoreMode;
+
+ // keyspace exists, and it contains specific cfs
+ verifier.get(keyspace).remove(columnFamily);
+ return ignoreMode ^ cfs.contains(columnFamily);
+ }
+
+ public void verifyKeyspaces(List<String> keyspaces)
+ {
+ for (String ks : verifier.keySet())
+ if (!keyspaces.contains(ks))
+ throw new IllegalArgumentException("Unknown keyspace: " + ks);
+ }
+
+ public void verifyColumnFamilies()
+ {
+ for (String ks : filter.keySet())
+ if (verifier.get(ks).size() > 0)
+ throw new IllegalArgumentException("Unknown column families: " + verifier.get(ks).toString() + " in keyspace: " + ks);
+ }
+ }
+ }
+
+ @Command(name = "cfhistograms", description = "Print statistic histograms for a given column family")
+ public static class CfHistograms extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "cfhistograms requires ks and cf args");
+
+ String keyspace = args.get(0);
+ String cfname = args.get(1);
+
+ // calculate percentile of row size and column count
+ long[] estimatedRowSize = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedRowSizeHistogram");
+ long[] estimatedColumnCount = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedColumnCountHistogram");
+
+ long[] bucketOffsets = new EstimatedHistogram().getBucketOffsets();
+ EstimatedHistogram rowSizeHist = new EstimatedHistogram(bucketOffsets, estimatedRowSize);
+ EstimatedHistogram columnCountHist = new EstimatedHistogram(bucketOffsets, estimatedColumnCount);
+
+ // build arrays to store percentile values
+ double[] estimatedRowSizePercentiles = new double[7];
+ double[] estimatedColumnCountPercentiles = new double[7];
+ double[] offsetPercentiles = new double[]{0.5, 0.75, 0.95, 0.98, 0.99};
+ for (int i = 0; i < offsetPercentiles.length; i++)
+ {
+ estimatedRowSizePercentiles[i] = rowSizeHist.percentile(offsetPercentiles[i]);
+ estimatedColumnCountPercentiles[i] = columnCountHist.percentile(offsetPercentiles[i]);
+ }
+
+ // min value
+ estimatedRowSizePercentiles[5] = rowSizeHist.min();
+ estimatedColumnCountPercentiles[5] = columnCountHist.min();
+ // max value
+ estimatedRowSizePercentiles[6] = rowSizeHist.max();
+ estimatedColumnCountPercentiles[6] = columnCountHist.max();
+
+ String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+ double[] readLatency = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "ReadLatency"));
+ double[] writeLatency = probe.metricPercentilesAsArray((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspace, cfname, "WriteLatency"));
+ double[] sstablesPerRead = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "SSTablesPerReadHistogram"));
+
+ System.out.println(format("%s/%s histograms", keyspace, cfname));
+ System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+ "Percentile", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
+ System.out.println(format("%-10s%10s%18s%18s%18s%18s",
+ "", "", "(micros)", "(micros)", "(bytes)", ""));
+
+ for (int i = 0; i < percentiles.length; i++)
+ {
+ System.out.println(format("%-10s%10.2f%18.2f%18.2f%18.0f%18.0f",
+ percentiles[i],
+ sstablesPerRead[i],
+ writeLatency[i],
+ readLatency[i],
+ estimatedRowSizePercentiles[i],
+ estimatedColumnCountPercentiles[i]));
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "cleanup", description = "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces")
+ public static class Cleanup extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ if (Keyspace.SYSTEM_KS.equals(keyspace))
+ continue;
+
+ try
+ {
+ probe.forceKeyspaceCleanup(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during cleanup", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "clearsnapshot", description = "Remove the snapshot with the given name from the given keyspaces. If no snapshotName is specified we will remove all snapshots")
+ public static class ClearSnapshot extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspaces>...] ", description = "Remove snapshots from the given keyspaces")
+ private List<String> keyspaces = new ArrayList<>();
+
+ @Option(title = "snapshot_name", name = "-t", description = "Remove the snapshot with a given name")
+ private String snapshotName = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("Requested clearing snapshot(s) for ");
+
+ if (keyspaces.isEmpty())
+ sb.append("[all keyspaces]");
+ else
+ sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+ if (!snapshotName.isEmpty())
+ sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+ System.out.println(sb.toString());
+
+ try
+ {
+ probe.clearSnapshot(snapshotName, toArray(keyspaces, String.class));
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during clearing snapshots", e);
+ }
+ }
+ }
+
+ @Command(name = "compact", description = "Force a (major) compaction on one or more column families")
+ public static class Compact extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.forceKeyspaceCompaction(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "flush", description = "Flush one or more column families")
+ public static class Flush extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.forceKeyspaceFlush(keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during flushing", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more column families")
+ public static class Scrub extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "disable_snapshot",
+ name = {"-ns", "--no-snapshot"},
+ description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
+ private boolean disableSnapshot = false;
+
+ @Option(title = "skip_corrupted",
+ name = {"-s", "--skip-corrupted"},
+ description = "Skip corrupted partitions even when scrubbing counter tables. (default false)")
+ private boolean skipCorrupted = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.scrub(disableSnapshot, skipCorrupted, keyspace, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during flushing", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "disableautocompaction", description = "Disable autocompaction for the given keyspace and column family")
+ public static class DisableAutoCompaction extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.disableAutoCompaction(keyspace, cfnames);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error occurred during disabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "enableautocompaction", description = "Enable autocompaction for the given keyspace and column family")
+ public static class EnableAutoCompaction extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.enableAutoCompaction(keyspace, cfnames);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "upgradesstables", description = "Rewrite sstables (for the requested column families) that are not on the current version (thus upgrading them to said current version)")
+ public static class UpgradeSSTable extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
+ private boolean includeAll = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ probe.upgradeSSTables(keyspace, !includeAll, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during enabling auto-compaction", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "compactionstats", description = "Print statistics on compactions")
+ public static class CompactionStats extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ int compactionThroughput = probe.getCompactionThroughput();
+ CompactionManagerMBean cm = probe.getCompactionManagerProxy();
+ System.out.println("pending tasks: " + probe.getCompactionMetric("PendingTasks"));
+ if (cm.getCompactions().size() > 0)
+ System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
+ long remainingBytes = 0;
+ for (Map<String, String> c : cm.getCompactions())
+ {
+ String percentComplete = new Long(c.get("total")) == 0
+ ? "n/a"
+ : new DecimalFormat("0.00").format((double) new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%";
+ System.out.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), c.get("unit"), percentComplete);
+ if (c.get("taskType").equals(OperationType.COMPACTION.toString()))
+ remainingBytes += (new Long(c.get("total")) - new Long(c.get("completed")));
+ }
+ long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes == 0
+ ? -1
+ : (remainingBytes) / (1024L * 1024L * compactionThroughput);
+ String remainingTime = remainingTimeInSecs < 0
+ ? "n/a"
+ : format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
+
+ System.out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
+ }
+ }
+
+ @Command(name = "compactionhistory", description = "Print history of compaction")
+ public static class CompactionHistory extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Compaction History: ");
+
+ TabularData tabularData = probe.getCompactionHistory();
+ if (tabularData.isEmpty())
+ {
+ System.out.printf("There is no compaction history");
+ return;
+ }
+
+ String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
+ List<String> indexNames = tabularData.getTabularType().getIndexNames();
+ System.out.printf(format, toArray(indexNames, Object.class));
+
+ Set<?> values = tabularData.keySet();
+ for (Object eachValue : values)
+ {
+ List<?> value = (List<?>) eachValue;
+ System.out.printf(format, toArray(value, Object.class));
+ }
+ }
+ }
+
+ @Command(name = "decommission", description = "Decommission the *node I am connecting to*")
+ public static class Decommission extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.decommission();
+ } catch (InterruptedException e)
+ {
+ throw new RuntimeException("Error decommissioning node", e);
+ }
+ }
+ }
+
+ @Command(name = "describecluster", description = "Print the name, snitch, partitioner and schema version of a cluster")
+ public static class DescribeCluster extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ // display cluster name, snitch and partitioner
+ System.out.println("Cluster Information:");
+ System.out.println("\tName: " + probe.getClusterName());
+ System.out.println("\tSnitch: " + probe.getEndpointSnitchInfoProxy().getSnitchName());
+ System.out.println("\tPartitioner: " + probe.getPartitioner());
+
+ // display schema version for each node
+ System.out.println("\tSchema versions:");
+ Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions();
+ for (String version : schemaVersions.keySet())
+ {
+ System.out.println(format("\t\t%s: %s%n", version, schemaVersions.get(version)));
+ }
+ }
+ }
+
+ @Command(name = "disablebinary", description = "Disable native transport (binary protocol)")
+ public static class DisableBinary extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.stopNativeTransport();
+ }
+ }
+
+ @Command(name = "enablebinary", description = "Reenable native transport (binary protocol)")
+ public static class EnableBinary extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startNativeTransport();
+ }
+ }
+
+ @Command(name = "enablegossip", description = "Reenable gossip")
+ public static class EnableGossip extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startGossiping();
+ }
+ }
+
+ @Command(name = "disablegossip", description = "Disable gossip (effectively marking the node down)")
+ public static class DisableGossip extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.stopGossiping();
+ }
+ }
+
+ @Command(name = "enablehandoff", description = "Reenable the future hints storing on the current node")
+ public static class EnableHandoff extends NodeToolCmd
+ {
++ @Arguments(usage = "<dc-name>,<dc-name>", description = "Enable hinted handoff only for these DCs")
++ private List<String> args = new ArrayList<>();
++
+ @Override
+ public void execute(NodeProbe probe)
+ {
- probe.enableHintedHandoff();
++ checkArgument(args.size() <= 1, "enablehandoff does not accept two args");
++ if(args.size() == 1)
++ probe.enableHintedHandoff(args.get(0));
++ else
++ probe.enableHintedHandoff();
+ }
+ }
+
+ @Command(name = "enablethrift", description = "Reenable thrift server")
+ public static class EnableThrift extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.startThriftServer();
+ }
+ }
+
+ @Command(name = "getcompactionthreshold", description = "Print min and max compaction thresholds for a given column family")
+ public static class GetCompactionThreshold extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace with a column family")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "getcompactionthreshold requires ks and cf args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+
+ ColumnFamilyStoreMBean cfsProxy = probe.getCfsProxy(ks, cf);
+ System.out.println("Current compaction thresholds for " + ks + "/" + cf + ": \n" +
+ " min = " + cfsProxy.getMinimumCompactionThreshold() + ", " +
+ " max = " + cfsProxy.getMaximumCompactionThreshold());
+ }
+ }
+
+ @Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
+ public static class GetCompactionThroughput extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
+ }
+ }
+
+ @Command(name = "getstreamthroughput", description = "Print the MB/s throughput cap for streaming in the system")
+ public static class GetStreamThroughput extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println("Current stream throughput: " + probe.getStreamThroughput() + " MB/s");
+ }
+ }
+
+ @Command(name = "getendpoints", description = "Print the end points that owns the key")
+ public static class GetEndpoints extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key for which we need to find the endpoint")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "getendpoints requires ks, cf and key args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+ String key = args.get(2);
+
+ List<InetAddress> endpoints = probe.getEndpoints(ks, cf, key);
+ for (InetAddress endpoint : endpoints)
+ {
+ System.out.println(endpoint.getHostAddress());
+ }
+ }
+ }
+
+ @Command(name = "getsstables", description = "Print the sstable filenames that own the key")
+ public static class GetSSTables extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "getsstables requires ks, cf and key args");
+ String ks = args.get(0);
+ String cf = args.get(1);
+ String key = args.get(2);
+
+ List<String> sstables = probe.getSSTables(ks, cf, key);
+ for (String sstable : sstables)
+ {
+ System.out.println(sstable);
+ }
+ }
+ }
+
+ @Command(name = "gossipinfo", description = "Shows the gossip information for the cluster")
+ public static class GossipInfo extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.out.println(probe.getGossipInfo());
+ }
+ }
+
+ @Command(name = "invalidatekeycache", description = "Invalidate the key cache")
+ public static class InvalidateKeyCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateKeyCache();
+ }
+ }
+
+ @Command(name = "invalidaterowcache", description = "Invalidate the row cache")
+ public static class InvalidateRowCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateRowCache();
+ }
+ }
+
+ @Command(name = "invalidatecountercache", description = "Invalidate the counter cache")
+ public static class InvalidateCounterCache extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.invalidateCounterCache();
+ }
+ }
+
+ @Command(name = "taketoken", description = "Move the token(s) from the existing owner(s) to this node. For vnodes only. Use \\\\ to escape negative tokens.")
+ public static class TakeToken extends NodeToolCmd
+ {
+ @Arguments(usage = "<token, ...>", description = "Token(s) to take", required = true)
+ private List<String> tokens = new ArrayList<String>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.takeTokens(tokens);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Error taking tokens", e);
+ }
+ }
+ }
+
+ @Command(name = "join", description = "Join the ring")
+ public static class Join extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkState(!probe.isJoined(), "This node has already joined the ring.");
+
+ try
+ {
+ probe.joinRing();
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during joining the ring", e);
+ }
+ }
+ }
+
+ @Command(name = "move", description = "Move node on the token ring to a new token")
+ public static class Move extends NodeToolCmd
+ {
+ @Arguments(usage = "<new token>", description = "The new token. (for negative tokens)", required = true)
+ private String newToken = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ probe.move(newToken);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during moving node", e);
+ }
+ }
+ }
+
+
+
+ @Command(name = "pausehandoff", description = "Pause hints delivery process")
+ public static class PauseHandoff extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.pauseHintsDelivery();
+ }
+ }
+
+ @Command(name = "resumehandoff", description = "Resume hints delivery process")
+ public static class ResumeHandoff extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.resumeHintsDelivery();
+ }
+ }
+
+
+ @Command(name = "proxyhistograms", description = "Print statistic histograms for network operations")
+ public static class ProxyHistograms extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
+ double[] readLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Read"));
+ double[] writeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Write"));
+ double[] rangeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("RangeSlice"));
+
+ System.out.println("proxy histograms");
+ System.out.println(format("%-10s%18s%18s%18s",
+ "Percentile", "Read Latency", "Write Latency", "Range Latency"));
+ System.out.println(format("%-10s%18s%18s%18s",
+ "", "(micros)", "(micros)", "(micros)"));
+ for (int i = 0; i < percentiles.length; i++)
+ {
+ System.out.println(format("%-10s%18.2f%18.2f%18.2f",
+ percentiles[i],
+ readLatency[i],
+ writeLatency[i],
+ rangeLatency[i]));
+ }
+ System.out.println();
+ }
+ }
+
+ @Command(name = "rebuild", description = "Rebuild data by streaming from other nodes (similarly to bootstrap)")
+ public static class Rebuild extends NodeToolCmd
+ {
+ @Arguments(usage = "<src-dc-name>", description = "Name of DC from which to select sources for streaming. By default, pick any DC")
+ private String sourceDataCenterName = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.rebuild(sourceDataCenterName);
+ }
+ }
+
+ @Command(name = "refresh", description = "Load newly placed SSTables to the system without restart")
+ public static class Refresh extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 2, "refresh requires ks and cf args");
+ probe.loadNewSSTables(args.get(0), args.get(1));
+ }
+ }
+
+ @Deprecated
+ @Command(name = "removetoken", description = "DEPRECATED (see removenode)", hidden = true)
+ public static class RemoveToken extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ System.err.println("Warn: removetoken is deprecated, please use removenode instead");
+ }
+ }
+
+ @Command(name = "removenode", description = "Show status of current node removal, force completion of pending removal or remove provided ID")
+ public static class RemoveNode extends NodeToolCmd
+ {
+ @Arguments(title = "remove_operation", usage = "<status>|<force>|<ID>", description = "Show status of current node removal, force completion of pending removal, or remove provided ID", required = true)
+ private String removeOperation = EMPTY;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ switch (removeOperation)
+ {
+ case "status":
+ System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+ break;
+ case "force":
+ System.out.println("RemovalStatus: " + probe.getRemovalStatus());
+ probe.forceRemoveCompletion();
+ break;
+ default:
+ probe.removeNode(removeOperation);
+ break;
+ }
+ }
+ }
+
+ @Command(name = "repair", description = "Repair one or more column families")
+ public static class Repair extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
+ private List<String> args = new ArrayList<>();
+
+ @Option(title = "parallel", name = {"-par", "--parallel"}, description = "Use -par to carry out a parallel repair")
+ private boolean parallel = false;
+
+ @Option(title = "local_dc", name = {"-local", "--in-local-dc"}, description = "Use -local to only repair against nodes in the same datacenter")
+ private boolean localDC = false;
+
+ @Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
+ private List<String> specificDataCenters = new ArrayList<>();
+
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
+ private String startToken = EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
+ private String endToken = EMPTY;
+
+ @Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
+ private boolean primaryRange = false;
+
+ @Option(title = "incremental_repair", name = {"-inc", "--incremental"}, description = "Use -inc to use the new incremental repair")
+ private boolean incrementalRepair = false;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalColumnFamilies(args);
+
+ for (String keyspace : keyspaces)
+ {
+ try
+ {
+ Collection<String> dataCenters = null;
+ if (!specificDataCenters.isEmpty())
+ dataCenters = newArrayList(specificDataCenters);
+ else if (localDC)
+ dataCenters = newArrayList(probe.getDataCenter());
+ if (!startToken.isEmpty() || !endToken.isEmpty())
+ probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken, !incrementalRepair);
+ else
+ probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, !incrementalRepair, cfnames);
+ } catch (Exception e)
+ {
+ throw new RuntimeException("Error occurred during repair", e);
+ }
+ }
+ }
+ }
+
+ @Command(name = "setcachecapacity", description = "Set global key, row, and counter cache capacities (in MB units)")
+ public static class SetCacheCapacity extends NodeToolCmd
+ {
+ @Arguments(title = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+ usage = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
+ description = "Key cache, row cache, and counter cache (in MB)",
+ required = true)
+ private List<Integer> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "setcachecapacity requires key-cache-capacity, row-cache-capacity, and counter-cache-capacity args.");
+ probe.setCacheCapacities(args.get(0), args.get(1), args.get(2));
+ }
+ }
+
+ @Command(name = "setcompactionthreshold", description = "Set min and max compaction thresholds for a given column family")
+ public static class SetCompactionThreshold extends NodeToolCmd
+ {
+ @Arguments(title = "<keyspace> <cfname> <minthreshold> <maxthreshold>", usage = "<keyspace> <cfname> <minthreshold> <maxthreshold>", description = "The keyspace, the column family, min and max threshold", required = true)
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 4, "setcompactionthreshold requires ks, cf, min, and max threshold args.");
+
+ int minthreshold = parseInt(args.get(2));
+ int maxthreshold = parseInt(args.get(3));
+ checkArgument(minthreshold >= 0 && maxthreshold >= 0, "Thresholds must be positive integers");
+ checkArgument(minthreshold <= maxthreshold, "Min threshold cannot be greater than max.");
+ checkArgument(minthreshold >= 2 || maxthreshold == 0, "Min threshold must be at least 2");
+
+ probe.setCompactionThreshold(args.get(0), args.get(1), minthreshold, maxthreshold);
+ }
+ }
+
+ @Command(name = "setcompactionthroughput", description = "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling")
+ public static class SetCompactionThroughput extends NodeToolCmd
+ {
+ @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+ private Integer compactionThroughput = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.setCompactionThroughput(compactionThroughput);
+ }
+ }
+
+ @Command(name = "setstreamthroughput", description = "Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling")
+ public static class SetStreamThroughput extends NodeToolCmd
+ {
+ @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
+ private Integer streamThroughput = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ probe.setStreamThroughput(streamThroughput);
+ }
+ }
+
+ @Command(name = "settraceprobability", description = "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default")
+ public static class SetTraceProbability extends NodeToolCmd
+ {
+ @Arguments(title = "trace_probability", usage = "<value>", description = "Trace probability between 0 and 1 (ex: 0.2)", required = true)
+ private Double traceProbability = null;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(traceProbability >= 0 && traceProbability <= 1, "Trace probability must be between 0 and 1");
+ probe.setTraceProbability(traceProbability);
+ }
+ }
+
+ @Command(name = "snapshot", description = "Take a snapshot of specified keyspaces or a snapshot of the specified column family")
+ public static class Snapshot extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspaces...>]", description = "List of keyspaces. By default, all keyspaces")
+ private List<String> keyspaces = new ArrayList<>();
+
+ @Option(title = "cfname", name = {"-cf", "--column-family"}, description = "The column family name (you must specify one and only one keyspace for using this option)")
+ private String columnFamily = null;
+
+ @Option(title = "tag", name = {"-t", "--tag"}, description = "The name of the snapshot")
+ private String snapshotName = Long.toString(System.currentTimeMillis());
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("Requested creating snapshot(s) for ");
+
+ if (keyspaces.isEmpty())
+ sb.append("[all keyspaces]");
+ else
+ sb.append("[").append(join(keyspaces, ", ")).append("]");
+
+ if (!snapshotName.isEmpty())
+ sb.append(" with snapshot name [").append(snapshotName).append("]");
+
+ System.out.println(sb.toString());
+
+ probe.takeSnapshot(snapshotName, columnFamily, toArray(keyspaces, String.class));
+ System.out.println("Snapshot directory: " + snapshotName);
+ } catch (IOException e)
+ {
+ throw new RuntimeException("Error during taking a snapshot", e);
+ }
+ }
+ }
+
+ @Command(name = "listsnapshots", description = "Lists all the snapshots along with the size on disk and true size.")
+ public static class ListSnapshots extends NodeToolCmd
+ {
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ try
+ {
+ System.out.println("Snapshot Details: ");
+
+ final Map<String,TabularData> snapshotDetails = probe.getSnapshotDetails();
+ if (snapshotDetails.isEmpty())
+ {
+ System.out.printf("There are no snapshots");
+ return;
+ }
+
+ final long trueSnapshotsSize = probe.trueSnapshotsSize();
+ final String format = "%-20s%-29s%-29s%-19s%-19s%n";
+ // display column names only once
+ final List<String> indexNames = snapshotDetails.entrySet().iterator().next().getValue().getTabularType().getIndexNames();
+ System.out.printf(format, (Object[]) indexNames.toArray(new String[indexNames.size()]));
+
+ for (final Map.Entry<String, TabularData> snapshotDetail : snapshotDetails.entrySet())
+ {
+ Set<?> values = snapshotDetail.getValue().keySet();
+ for (Object eachValue : values)
+ {
+ final List<?> value = (List<?>) eachValue;
+ System.out.printf(format, value.toArray(new Object[value.size()]));
+ }
+ }
+
+ System.out.println("\nTotal TrueDiskSpaceUsed: " + FileUtils.stringifyFileSize(trueSnapshotsSize) + "\n");
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error during list snapshot", e);
+ }
+ }
+ }
+
+ @Command(name = "status", description = "Print cluster information (state, load, IDs, ...)")
+ public static class Status extends NodeToolCmd
+ {
+ @Arguments(usage = "[<keyspace>]", description = "The keyspace name")
+ private String keyspace = null;
+
+ @Option(title = "resolve_ip", name = {"-r", "--resolve-ip"}, description = "Show node domain names instead of IPs")
+ private boolean resolveIp = false;
+
+ private boolean hasEffectiveOwns = false;
+ private boolean isTokenPerNode = true;
+ private int maxAddressLength = 0;
+ private String format = null;
+ private Collection<String> joiningNodes, leavingNodes, movingNodes, liveNodes, unreachableNodes;
+ private Map<String, String> loadMap, hostIDMap, tokensToEndpoints;
+ private EndpointSnitchInfoMBean epSnitchInfo;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ joiningNodes = probe.getJoiningNodes();
+ leavingNodes = probe.getLeavingNodes();
+ movingNodes = probe.getMovingNodes();
+ loadMap = probe.getLoadMap();
+ tokensToEndpoints = probe.getTokenToEndpointMap();
+ liveNodes = probe.getLiveNodes();
+ unreachableNodes = probe.getUnreachableNodes();
+ hostIDMap = probe.getHostIdMap();
+ epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+ SetHostStat ownerships;
+ try
+ {
+ ownerships = new SetHostStat(probe.effectiveOwnership(keyspace));
+ hasEffectiveOwns = true;
+ } catch (IllegalStateException e)
+ {
+ ownerships = new SetHostStat(probe.getOwnership());
+ }
+
+ // More tokens then nodes (aka vnodes)?
+ if (new HashSet<>(tokensToEndpoints.values()).size() < tokensToEndpoints.keySet().size())
+ isTokenPerNode = false;
+
+ Map<String, SetHostStat> dcs = getOwnershipByDc(probe, ownerships);
+
+ findMaxAddressLength(dcs);
+
+ // Datacenters
+ for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+ {
+ String dcHeader = String.format("Datacenter: %s%n", dc.getKey());
+ System.out.printf(dcHeader);
+ for (int i = 0; i < (dcHeader.length() - 1); i++) System.out.print('=');
+ System.out.println();
+
+ // Legend
+ System.out.println("Status=Up/Down");
+ System.out.println("|/ State=Normal/Leaving/Joining/Moving");
+
+ printNodesHeader(hasEffectiveOwns, isTokenPerNode);
+
+ // Nodes
+ for (HostStat entry : dc.getValue())
+ printNode(probe, entry, hasEffectiveOwns, isTokenPerNode);
+ }
+
+ }
+
+ private void findMaxAddressLength(Map<String, SetHostStat> dcs)
+ {
+ maxAddressLength = 0;
+ for (Map.Entry<String, SetHostStat> dc : dcs.entrySet())
+ {
+ for (HostStat stat : dc.getValue())
+ {
+ maxAddressLength = Math.max(maxAddressLength, stat.ipOrDns().length());
+ }
+ }
+ }
+
+ private void printNodesHeader(boolean hasEffectiveOwns, boolean isTokenPerNode)
+ {
+ String fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+ String owns = hasEffectiveOwns ? "Owns (effective)" : "Owns";
+
+ if (isTokenPerNode)
+ System.out.printf(fmt, "-", "-", "Address", "Load", owns, "Host ID", "Token", "Rack");
+ else
+ System.out.printf(fmt, "-", "-", "Address", "Load", "Tokens", owns, "Host ID", "Rack");
+ }
+
+ private void printNode(NodeProbe probe, HostStat hostStat, boolean hasEffectiveOwns, boolean isTokenPerNode)
+ {
+ String status, state, load, strOwns, hostID, rack, fmt;
+ fmt = getFormat(hasEffectiveOwns, isTokenPerNode);
+ String endpoint = hostStat.ip;
+ if (liveNodes.contains(endpoint)) status = "U";
+ else if (unreachableNodes.contains(endpoint)) status = "D";
+ else status = "?";
+ if (joiningNodes.contains(endpoint)) state = "J";
+ else if (leavingNodes.contains(endpoint)) state = "L";
+ else if (movingNodes.contains(endpoint)) state = "M";
+ else state = "N";
+
+ load = loadMap.containsKey(endpoint) ? loadMap.get(endpoint) : "?";
+ strOwns = new DecimalFormat("##0.0%").format(hostStat.owns);
+ hostID = hostIDMap.get(endpoint);
+
+ try
+ {
+ rack = epSnitchInfo.getRack(endpoint);
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (isTokenPerNode)
+ {
+ System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, strOwns, hostID, probe.getTokens(endpoint).get(0), rack);
+ } else
+ {
+ int tokens = probe.getTokens(endpoint).size();
+ System.out.printf(fmt, status, state, hostStat.ipOrDns(), load, tokens, strOwns, hostID, rack);
+ }
+ }
+
+ private String getFormat(
+ boolean hasEffectiveOwns,
+ boolean isTokenPerNode)
+ {
+ if (format == null)
+ {
+ StringBuilder buf = new StringBuilder();
+ String addressPlaceholder = String.format("%%-%ds ", maxAddressLength);
+ buf.append("%s%s "); // status
+ buf.append(addressPlaceholder); // address
+ buf.append("%-9s "); // load
+ if (!isTokenPerNode)
+ buf.append("%-6s "); // "Tokens"
+ if (hasEffectiveOwns)
+ buf.append("%-16s "); // "Owns (effective)"
+ else
+ buf.append("%-6s "); // "Owns
+ buf.append("%-36s "); // Host ID
+ if (isTokenPerNode)
+ buf.append("%-39s "); // token
+ buf.append("%s%n"); // "Rack"
+
+ format = buf.toString();
+ }
+
+ return format;
+ }
+
+ private Map<String, SetHostStat> getOwnershipByDc(NodeProbe probe, SetHostStat ownerships)
+ {
+ Map<String, SetHostStat> ownershipByDc = Maps.newLinkedHashMap();
+ EndpointSnitchInfoMBean epSnitchInfo = probe.getEndpointSnitchInfoProxy();
+
+ try
+ {
+ for (HostStat ownership : ownerships)
+ {
+ String dc = epSnitchInfo.getDatacenter(ownership.ip);
+ if (!ownershipByDc.containsKey(dc))
+ ownershipByDc.put(dc, new SetHostStat());
+ ownershipByDc.get(dc).add(ownership);
+ }
+ } catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return ownershipByDc;
+ }
+
+ class SetHostStat implements Iterable<HostStat>
+ {
+ final List<HostStat> hostStats = new ArrayList<>();
+
+ public SetHostStat()
+ {
+ }
+
+ public SetHostStat(Map<InetAddress, Float> ownerships)
+ {
+ for (Map.Entry<InetAddress, Float> entry : ownerships.entrySet())
+ {
+ hostStats.add(new HostStat(entry));
+ }
+ }
+
+ @Override
<TRUNCATED>
[2/8] git commit: Allow per-dc enabling of hints patch by Sankalp
Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Posted by jb...@apache.org.
Allow per-dc enabling of hints
patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6157
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/500c62d6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/500c62d6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/500c62d6
Branch: refs/heads/cassandra-2.1
Commit: 500c62d6b98f5b5c15b91f1d38f0132b846c6b48
Parents: 972cffd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Feb 18 11:21:13 2014 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Feb 18 11:21:13 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
build.xml | 2 +
conf/cassandra.yaml | 3 +
lib/licenses/super-csv-2.1.0.txt | 202 +++++++++++++++++++
lib/super-csv-2.1.0.jar | Bin 0 -> 91473 bytes
.../org/apache/cassandra/config/Config.java | 49 ++++-
.../cassandra/config/DatabaseDescriptor.java | 40 +++-
.../config/YamlConfigurationLoader.java | 1 +
.../apache/cassandra/service/StorageProxy.java | 22 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/tools/NodeCmd.java | 5 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
12 files changed, 329 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfec11..f0c116f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Allow per-dc enabling of hints (CASSANDRA-6157)
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
@@ -27,6 +28,7 @@ Merged from 1.2:
* Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
* Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
+
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
* Add ks.cf names to tombstone logging (CASSANDRA-6597)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index fb774f6..77b2639 100644
--- a/build.xml
+++ b/build.xml
@@ -381,6 +381,7 @@
<dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="antelder" name="Anthony Elder"/>
@@ -461,6 +462,7 @@
<dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
<dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
<dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+ <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
<dependency groupId="log4j" artifactId="log4j"/>
<!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bfe60c4..4c9ad67 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -29,6 +29,9 @@ num_tokens: 256
# that do not have vnodes enabled.
# initial_token:
+# May either be "true" or "false" to enable globally, or contain a list
+# of data centers to enable per-datacenter.
+# hinted_handoff_enabled: DC1,DC2
# See http://wiki.apache.org/cassandra/HintedHandoff
hinted_handoff_enabled: true
# this defines the maximum amount of time a dead host will have hints
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/licenses/super-csv-2.1.0.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/super-csv-2.1.0.txt b/lib/licenses/super-csv-2.1.0.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/super-csv-2.1.0.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/lib/super-csv-2.1.0.jar
----------------------------------------------------------------------
diff --git a/lib/super-csv-2.1.0.jar b/lib/super-csv-2.1.0.jar
new file mode 100644
index 0000000..6a85716
Binary files /dev/null and b/lib/super-csv-2.1.0.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2fa49f3..ceb8df0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -17,8 +17,18 @@
*/
package org.apache.cassandra.config;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.supercsv.io.CsvListReader;
+import org.supercsv.prefs.CsvPreference;
+
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.NativeAllocator;
import org.apache.cassandra.utils.FBUtilities;
@@ -38,7 +48,9 @@ public class Config
public String partitioner;
public Boolean auto_bootstrap = true;
- public volatile Boolean hinted_handoff_enabled = true;
+ public volatile boolean hinted_handoff_enabled_global = true;
+ public String hinted_handoff_enabled;
+ public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
public SeedProviderDef seed_provider;
@@ -185,6 +197,9 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
+ .surroundingSpacesNeedQuotes(true).build();
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
@@ -205,6 +220,38 @@ public class Config
isClientMode = clientMode;
}
+ public void configHintedHandoff() throws ConfigurationException
+ {
+ if (hinted_handoff_enabled != null && !hinted_handoff_enabled.isEmpty())
+ {
+ if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("true"))
+ {
+ hinted_handoff_enabled_global = true;
+ }
+ else if (hinted_handoff_enabled.toLowerCase().equalsIgnoreCase("false"))
+ {
+ hinted_handoff_enabled_global = false;
+ }
+ else
+ {
+ try
+ {
+ hinted_handoff_enabled_by_dc.addAll(parseHintedHandoffEnabledDCs(hinted_handoff_enabled));
+ }
+ catch (IOException e)
+ {
+ throw new ConfigurationException("Invalid hinted_handoff_enabled parameter " + hinted_handoff_enabled, e);
+ }
+ }
+ }
+ }
+
+ public static List<String> parseHintedHandoffEnabledDCs(final String dcNames) throws IOException
+ {
+ final CsvListReader csvListReader = new CsvListReader(new StringReader(dcNames), STANDARD_SURROUNDING_SPACES_NEED_QUOTES);
+ return csvListReader.read();
+ }
+
public static enum CommitLogSync
{
periodic,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e1a95ab..9e06601 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
import java.io.File;
import java.io.FileFilter;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -1109,12 +1110,47 @@ public class DatabaseDescriptor
public static void setHintedHandoffEnabled(boolean hintedHandoffEnabled)
{
- conf.hinted_handoff_enabled = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_global = hintedHandoffEnabled;
+ conf.hinted_handoff_enabled_by_dc.clear();
+ }
+
+ public static void setHintedHandoffEnabled(final String dcNames)
+ {
+ List<String> dcNameList;
+ try
+ {
+ dcNameList = Config.parseHintedHandoffEnabledDCs(dcNames);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Could not read csv of dcs for hinted handoff enable. " + dcNames, e);
+ }
+
+ if (dcNameList.isEmpty())
+ throw new IllegalArgumentException("Empty list of Dcs for hinted handoff enable");
+
+ conf.hinted_handoff_enabled_by_dc.clear();
+ conf.hinted_handoff_enabled_by_dc.addAll(dcNameList);
}
public static boolean hintedHandoffEnabled()
{
- return conf.hinted_handoff_enabled;
+ return conf.hinted_handoff_enabled_global;
+ }
+
+ public static Set<String> hintedHandoffEnabledByDC()
+ {
+ return Collections.unmodifiableSet(conf.hinted_handoff_enabled_by_dc);
+ }
+
+ public static boolean shouldHintByDC()
+ {
+ return !conf.hinted_handoff_enabled_by_dc.isEmpty();
+ }
+
+ public static boolean hintedHandoffEnabled(final String dcName)
+ {
+ return conf.hinted_handoff_enabled_by_dc.contains(dcName);
}
public static void setMaxHintWindow(int ms)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index d8a138c..6b5a152 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -91,6 +91,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
constructor.setPropertyUtils(propertiesChecker);
Yaml yaml = new Yaml(constructor);
Config result = yaml.loadAs(input, Config.class);
+ result.configHintedHandoff();
propertiesChecker.check();
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8d1f913..14c1ce3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1758,11 +1758,21 @@ public class StorageProxy implements StorageProxyMBean
return DatabaseDescriptor.hintedHandoffEnabled();
}
+ public Set<String> getHintedHandoffEnabledByDC()
+ {
+ return DatabaseDescriptor.hintedHandoffEnabledByDC();
+ }
+
public void setHintedHandoffEnabled(boolean b)
{
DatabaseDescriptor.setHintedHandoffEnabled(b);
}
+ public void setHintedHandoffEnabledByDCList(String dcNames)
+ {
+ DatabaseDescriptor.setHintedHandoffEnabled(dcNames);
+ }
+
public int getMaxHintWindow()
{
return DatabaseDescriptor.getMaxHintWindow();
@@ -1775,7 +1785,17 @@ public class StorageProxy implements StorageProxyMBean
public static boolean shouldHint(InetAddress ep)
{
- if (!DatabaseDescriptor.hintedHandoffEnabled())
+ if (DatabaseDescriptor.shouldHintByDC())
+ {
+ final String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(ep);
+ // Disable DC specific hints
+ if(!DatabaseDescriptor.hintedHandoffEnabled(dc))
+ {
+ HintedHandOffManager.instance.metrics.incrPastWindow(ep);
+ return false;
+ }
+ }
+ else if (!DatabaseDescriptor.hintedHandoffEnabled())
{
HintedHandOffManager.instance.metrics.incrPastWindow(ep);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index ad7d4c7..203cabe 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface StorageProxyMBean
{
@@ -72,7 +73,9 @@ public interface StorageProxyMBean
public long getTotalHints();
public boolean getHintedHandoffEnabled();
+ public Set<String> getHintedHandoffEnabledByDC();
public void setHintedHandoffEnabled(boolean b);
+ public void setHintedHandoffEnabledByDCList(String dcs);
public int getMaxHintWindow();
public void setMaxHintWindow(int ms);
public int getMaxHintsInProgress();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index f3cd563..fb29342 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -1229,7 +1229,10 @@ public class NodeCmd
case DISABLEGOSSIP : probe.stopGossiping(); break;
case ENABLEGOSSIP : probe.startGossiping(); break;
case DISABLEHANDOFF : probe.disableHintedHandoff(); break;
- case ENABLEHANDOFF : probe.enableHintedHandoff(); break;
+ case ENABLEHANDOFF :
+ if (arguments.length > 0) { probe.enableHintedHandoff(arguments[0]); }
+ else { probe.enableHintedHandoff(); }
+ break;
case PAUSEHANDOFF : probe.pauseHintsDelivery(); break;
case RESUMEHANDOFF : probe.resumeHintsDelivery(); break;
case DISABLETHRIFT : probe.stopThriftServer(); break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/500c62d6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index ffd6203..28cafb7 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -721,6 +721,11 @@ public class NodeProbe
spProxy.setHintedHandoffEnabled(true);
}
+ public void enableHintedHandoff(String dcNames)
+ {
+ spProxy.setHintedHandoffEnabledByDCList(dcNames);
+ }
+
public void pauseHintsDelivery()
{
hhProxy.pauseHintsDelivery(true);