You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/05/19 16:01:01 UTC
[10/16] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d693ca12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d693ca12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d693ca12
Branch: refs/heads/cassandra-2.1
Commit: d693ca12c76c2651df1769e137a94b954174e061
Parents: adb8831 be9eff5
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue May 19 08:50:28 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue May 19 08:50:28 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +--
.../db/compaction/CompactionManager.java | 8 ++---
.../cassandra/db/compaction/Scrubber.java | 12 ++++---
.../cassandra/service/StorageService.java | 7 +++-
.../cassandra/service/StorageServiceMBean.java | 2 ++
.../org/apache/cassandra/tools/NodeProbe.java | 8 ++---
.../org/apache/cassandra/tools/NodeTool.java | 7 +++-
.../cassandra/tools/StandaloneScrubber.java | 6 +++-
.../unit/org/apache/cassandra/db/ScrubTest.java | 38 +++++++++++++-------
10 files changed, 63 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 198935b,cf124b4..6fc1c9c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -30,6 -3,10 +30,7 @@@ Merged from 2.0
* Clone SliceQueryFilter in AbstractReadCommand implementations (CASSANDRA-8940)
* Push correct protocol notification for DROP INDEX (CASSANDRA-9310)
* token-generator - generated tokens too long (CASSANDRA-9300)
+ * Add option not to validate atoms during scrub (CASSANDRA-9406)
-
-
-2.0.15:
* Fix counting of tombstones for TombstoneOverwhelmingException (CASSANDRA-9299)
* Fix ReconnectableSnitch reconnecting to peers during upgrade (CASSANDRA-6702)
* Include keyspace and table name in error log for collections over the size
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bdc2d8b,eec4044..0951c01
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1394,22 -1117,22 +1394,22 @@@ public class ColumnFamilyStore implemen
return maxFile;
}
- public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException
{
- CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer);
+ return CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
- public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException, InterruptedException
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException
++ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException
{
// skip snapshot creation during scrub, SEE JIRA 5891
if(!disableSnapshot)
snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
- return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted);
- CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData);
++ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData);
}
- public void sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException
{
- CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
+ return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
}
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c7232a0,207b90d..47bd2d6
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -246,83 -205,42 +246,83 @@@ public class CompactionManager implemen
}
}
- private static interface AllSSTablesOperation
- {
- public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException;
- }
-
- private void performAllSSTableOperation(final ColumnFamilyStore cfs, final AllSSTablesOperation operation) throws InterruptedException, ExecutionException
+ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
{
- final Iterable<SSTableReader> sstables = cfs.markAllCompacting();
- if (sstables == null)
- return;
-
- Callable<Object> runnable = new Callable<Object>()
+ Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
+ if (compactingSSTables == null)
{
- public Object call() throws IOException
+ logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.ABORTED;
+ }
+ if (Iterables.isEmpty(compactingSSTables))
+ {
+ logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.SUCCESSFUL;
+ }
+ try
+ {
+ Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables);
+ List<Future<Object>> futures = new ArrayList<>();
+
+ for (final SSTableReader sstable : sstables)
{
- try
+ if (executor.isShutdown())
{
- operation.perform(cfs, sstables);
+ logger.info("Executor has shut down, not submitting task");
+ return AllSSTableOpStatus.ABORTED;
}
- finally
+
+ futures.add(executor.submit(new Callable<Object>()
{
- cfs.getDataTracker().unmarkCompacting(sstables);
- }
- return this;
+ @Override
+ public Object call() throws Exception
+ {
+ operation.execute(sstable);
+ return this;
+ }
+ }));
}
- };
- executor.submit(runnable).get();
+
+ for (Future<Object> f : futures)
+ f.get();
+ }
+ finally
+ {
+ cfs.getDataTracker().unmarkCompacting(compactingSSTables);
+ }
+ return AllSSTableOpStatus.SUCCESSFUL;
}
- public void performScrub(ColumnFamilyStore cfStore, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException
+ private static interface OneSSTableOperation
{
- performAllSSTableOperation(cfStore, new AllSSTablesOperation()
+ Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input);
+ void execute(SSTableReader input) throws IOException;
+ }
+
+ public enum AllSSTableOpStatus { ABORTED(1), SUCCESSFUL(0);
+ public final int statusCode;
+
+ AllSSTableOpStatus(int statusCode)
{
- public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException
+ this.statusCode = statusCode;
+ }
+ }
+
- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException
++ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException
+ {
+ assert !cfs.isIndex();
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+ {
+ @Override
+ public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
{
- doScrub(store, sstables, skipCorrupted, checkData);
+ return input;
+ }
+
+ @Override
+ public void execute(SSTableReader input) throws IOException
+ {
- scrubOne(cfs, input, skipCorrupted);
++ scrubOne(cfs, input, skipCorrupted, checkData);
}
});
}
@@@ -639,9 -425,23 +639,9 @@@
}
}
- private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
- /**
- * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover
- * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
- * from early ByteBuffer bugs.
- *
- * @throws IOException
- */
- private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean skipCorrupted, boolean checkData) throws IOException
- {
- assert !cfs.isIndex();
- for (final SSTableReader sstable : sstables)
- scrubOne(cfs, sstable, skipCorrupted, checkData);
- }
-
+ private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, checkData);
++ Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData);
CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
metrics.beginCompaction(scrubInfo);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e8814e4,e5bcd25..ec0532c
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -36,10 -33,11 +36,11 @@@ import org.apache.cassandra.utils.Outpu
public class Scrubber implements Closeable
{
- public final ColumnFamilyStore cfs;
- public final SSTableReader sstable;
- public final File destination;
- public final boolean skipCorrupted;
+ private final ColumnFamilyStore cfs;
+ private final SSTableReader sstable;
+ private final File destination;
+ private final boolean skipCorrupted;
+ public final boolean validateColumns;
private final CompactionController controller;
private final boolean isCommutative;
@@@ -74,18 -71,18 +75,19 @@@
};
private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException
++ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
{
- this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline);
- this(cfs, sstable, skipCorrupted, checkData, new OutputHandler.LogOutput(), false);
++ this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
}
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData, OutputHandler outputHandler, boolean isOffline) throws IOException
++ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
{
this.cfs = cfs;
this.sstable = sstable;
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
+ this.isOffline = isOffline;
+ this.validateColumns = checkData;
List<SSTableReader> toScrub = Collections.singletonList(sstable);
@@@ -184,15 -196,15 +186,15 @@@
}
if (dataSize > dataFile.length())
- throw new IOError(new IOException("Impossible row size (greater than file length): " + dataSize));
+ throw new IOError(new IOException("Impossible row size " + dataSize));
if (dataStart != dataStartFromIndex)
- outputHandler.warn(String.format("Data file row position %d differs from index file row position %d", dataStart, dataSizeFromIndex));
+ outputHandler.warn(String.format("Data file row position %d different from index file row position %d", dataStart, dataSizeFromIndex));
if (dataSize != dataSizeFromIndex)
- outputHandler.warn(String.format("Data file row size %d differs from index file row size %d", dataSize, dataSizeFromIndex));
+ outputHandler.warn(String.format("Data file row size %d different from index file row size %d", dataSize, dataSizeFromIndex));
- SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+ SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, validateColumns);
if (prevKey != null && prevKey.compareTo(key) > 0)
{
saveOutOfOrderRow(prevKey, key, atoms);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 1ea915b,62b0c75..7c8e424
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2289,38 -2179,28 +2289,43 @@@ public class StorageService extends Not
if (keyspaceName.equals(Keyspace.SYSTEM_KS))
throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise");
- CounterId.OneShotRenewer counterIdRenewer = new CounterId.OneShotRenewer();
+ CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
{
- cfStore.forceCleanup(counterIdRenewer);
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup();
+ if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+ status = oneStatus;
}
+ return status.statusCode;
}
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies);
++ return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies);
+ }
+
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
- cfStore.scrub(disableSnapshot, skipCorrupted, checkData);
+ {
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted);
++ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData);
+ if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+ status = oneStatus;
+ }
+ return status.statusCode;
}
- public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
+ CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies))
- cfStore.sstablesRewrite(excludeCurrentVersion);
+ {
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion);
+ if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+ status = oneStatus;
+ }
+ return status.statusCode;
}
public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ab34e1b,57780a3..1f86d82
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -256,7 -231,9 +256,9 @@@ public interface StorageServiceMBean ex
*
* Scrubbed CFs will be snapshotted first, if disableSnapshot is false
*/
+ @Deprecated
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Rewrite all sstables to the latest version.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d693ca12/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 5b97e79,e8e087f..6e7179a
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -223,49 -186,21 +223,49 @@@ public class NodeProbe implements AutoC
jmxc.close();
}
- public void forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
+ }
+
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
- return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies);
++ return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies);
+ }
+
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies);
+ }
+
+ public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
+ if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0)
+ {
+ failed = true;
+ out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
+ }
}
- public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
- public void scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
++ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- if (scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies) != 0)
- ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies);
++ if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0)
+ {
+ failed = true;
+ out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
+ }
}
- public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies);
+ if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0)
+ {
+ failed = true;
+ out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
+ }
}
+
public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceCompaction(keyspaceName, columnFamilies);