You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/03/05 10:07:47 UTC
[cassandra] 01/02: Add possibility to copy SSTables in
SSTableImporter instead of moving them
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 309b3033d44c5cdc18d6e3897661966853d39407
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Mon Feb 15 16:23:10 2021 +0100
Add possibility to copy SSTables in SSTableImporter instead of moving them
Patch by Stefan Miklosovic; reviewed by Michael Semb Wever and Marcus Eriksson for CASSANDRA-16407
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 10 +-
.../cassandra/db/ColumnFamilyStoreMBean.java | 27 ++++-
.../org/apache/cassandra/db/SSTableImporter.java | 47 +++++++--
.../cassandra/io/sstable/format/SSTableReader.java | 23 ++++-
.../cassandra/io/sstable/format/SSTableWriter.java | 27 +++++
.../org/apache/cassandra/io/util/FileUtils.java | 115 ++++++++++++++++-----
src/java/org/apache/cassandra/tools/NodeProbe.java | 4 +-
.../apache/cassandra/tools/nodetool/Import.java | 7 +-
test/unit/org/apache/cassandra/db/ImportTest.java | 77 +++++++++++---
.../cassandra/io/sstable/SSTableReaderTest.java | 6 +-
11 files changed, 284 insertions(+), 60 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 839302b..24163c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Add possibility to copy SSTables in SSTableImporter instead of moving them (CASSANDRA-16407)
* Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482)
* Fix cassandra-stress JMX connection (CASSANDRA-16473)
* Avoid processing redundant application states on endpoint changes (CASSANDRA-16381)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1089e13..00aebc4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -709,7 +709,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* #{@inheritDoc}
*/
- public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+ public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
{
SSTableImporter.Options options = SSTableImporter.Options.options(srcPaths)
.resetLevel(resetLevel)
@@ -717,11 +717,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
.verifySSTables(verifySSTables)
.verifyTokens(verifyTokens)
.invalidateCaches(invalidateCaches)
- .extendedVerify(extendedVerify).build();
+ .extendedVerify(extendedVerify)
+ .copyData(copyData).build();
return sstableImporter.importNewSSTables(options);
}
+ public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+ {
+ return importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, false);
+ }
+
Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
{
Descriptor newDescriptor;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index c2cf393..23757ba 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -152,18 +152,41 @@ public interface ColumnFamilyStoreMBean
* @param verifySSTables if the new sstables should be verified that they are not corrupt
* @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node
* @param invalidateCaches if row cache should be invalidated for the keys in the new sstables
- * @param jbodCheck if the new sstables should be placed 'optimally' - count tokens and move the sstable to the directory where it has the most keys
* @param extendedVerify if we should run an extended verify checking all values in the new sstables
*
* @return list of failed import directories
*/
+ @Deprecated
+ public List<String> importNewSSTables(Set<String> srcPaths,
+ boolean resetLevel,
+ boolean clearRepaired,
+ boolean verifySSTables,
+ boolean verifyTokens,
+ boolean invalidateCaches,
+ boolean extendedVerify);
+
+ /**
+ * Load new sstables from the given directory
+ *
+ * @param srcPaths the path to the new sstables - if it is an empty set, the data directories will be scanned
+ * @param resetLevel if the level should be reset to 0 on the new sstables
+ * @param clearRepaired if repaired info should be wiped from the new sstables
+ * @param verifySSTables if the new sstables should be verified that they are not corrupt
+ * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node
+ * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables
+ * @param extendedVerify if we should run an extended verify checking all values in the new sstables
+ * @param copyData if we should copy data from source paths instead of moving them
+ *
+ * @return list of failed import directories
+ */
public List<String> importNewSSTables(Set<String> srcPaths,
boolean resetLevel,
boolean clearRepaired,
boolean verifySSTables,
boolean verifyTokens,
boolean invalidateCaches,
- boolean extendedVerify);
+ boolean extendedVerify,
+ boolean copyData);
@Deprecated
public void loadNewSSTables();
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 7597f82..989ff12 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -40,9 +39,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -139,7 +136,7 @@ public class SSTableImporter
Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
- SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue());
+ SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
newSSTablesPerDirectory.add(sstable);
}
catch (Throwable t)
@@ -149,7 +146,14 @@ public class SSTableImporter
{
logger.error("Failed importing sstables in directory {}", dir, t);
failedDirectories.add(dir);
- moveSSTablesBack(movedSSTables);
+ if (options.copyData)
+ {
+ removeCopiedSSTables(movedSSTables);
+ }
+ else
+ {
+ moveSSTablesBack(movedSSTables);
+ }
movedSSTables.clear();
newSSTablesPerDirectory.clear();
break;
@@ -285,6 +289,25 @@ public class SSTableImporter
}
/**
+ * Similarly for moving case, we need to delete all SSTables which were copied already but the
+ * copying as a whole has failed so we do not leave any traces behind such failed import.
+ *
+ * @param movedSSTables tables we have moved already (by copying) which need to be removed
+ */
+ private void removeCopiedSSTables(Set<MovedSSTable> movedSSTables)
+ {
+ logger.debug("Removing copied SSTables which were left in data directories after failed SSTable import.");
+ for (MovedSSTable movedSSTable : movedSSTables)
+ {
+ if (new File(movedSSTable.newDescriptor.filenameFor(Component.DATA)).exists())
+ {
+ // no logging here as for moveSSTablesBack case above as logging is done in delete method
+ SSTableWriter.delete(movedSSTable.newDescriptor, movedSSTable.components);
+ }
+ }
+ }
+
+ /**
* Iterates over all keys in the sstable index and invalidates the row cache
*/
@VisibleForTesting
@@ -365,8 +388,9 @@ public class SSTableImporter
private final boolean verifyTokens;
private final boolean invalidateCaches;
private final boolean extendedVerify;
+ private final boolean copyData;
- public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+ public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
{
this.srcPaths = srcPaths;
this.resetLevel = resetLevel;
@@ -375,6 +399,7 @@ public class SSTableImporter
this.verifyTokens = verifyTokens;
this.invalidateCaches = invalidateCaches;
this.extendedVerify = extendedVerify;
+ this.copyData = copyData;
}
public static Builder options(String srcDir)
@@ -403,6 +428,7 @@ public class SSTableImporter
", verifyTokens=" + verifyTokens +
", invalidateCaches=" + invalidateCaches +
", extendedVerify=" + extendedVerify +
+ ", copyData= " + copyData +
'}';
}
@@ -415,6 +441,7 @@ public class SSTableImporter
private boolean verifyTokens = false;
private boolean invalidateCaches = false;
private boolean extendedVerify = false;
+ private boolean copyData = false;
private Builder(Set<String> srcPath)
{
@@ -458,9 +485,15 @@ public class SSTableImporter
return this;
}
+ public Builder copyData(boolean value)
+ {
+ copyData = value;
+ return this;
+ }
+
public Options build()
{
- return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify);
+ return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData);
}
}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 3e14422..652d9c0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.metadata.*;
@@ -2254,7 +2255,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*
* All components given will be moved/renamed
*/
- public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components)
+ public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData)
{
if (!oldDescriptor.isCompatible())
throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
@@ -2276,8 +2277,24 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
throw new RuntimeException(msg);
}
- logger.info("Renaming new SSTable {} to {}", oldDescriptor, newDescriptor);
- SSTableWriter.rename(oldDescriptor, newDescriptor, components);
+ if (copyData)
+ {
+ try
+ {
+ logger.info("Hardlinking new SSTable {} to {}", oldDescriptor, newDescriptor);
+ SSTableWriter.hardlink(oldDescriptor, newDescriptor, components);
+ }
+ catch (FSWriteError ex)
+ {
+ logger.warn("Unable to hardlink new SSTable {} to {}, falling back to copying", oldDescriptor, newDescriptor, ex);
+ SSTableWriter.copy(oldDescriptor, newDescriptor, components);
+ }
+ }
+ else
+ {
+ logger.info("Moving new SSTable {} to {}", oldDescriptor, newDescriptor);
+ SSTableWriter.rename(oldDescriptor, newDescriptor, components);
+ }
SSTableReader reader;
try
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1dbfcdb..cce5378 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -344,6 +344,33 @@ public abstract class SSTableWriter extends SSTable implements Transactional
FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
}
+ public static void copy(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
+ {
+ for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
+ {
+ FileUtils.copyWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+ }
+
+ // do -Data last because -Data present should mean the sstable was completely copied before crash
+ FileUtils.copyWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+ // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+ FileUtils.copyWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
+ }
+
+ public static void hardlink(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
+ {
+ for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
+ {
+ FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+ }
+
+ // do -Data last because -Data present should mean the sstable was completely copied before crash
+ FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+ // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+ FileUtils.createHardLinkWithoutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
+ }
public static abstract class Factory
{
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index e0ea436..7798bd7 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -114,28 +114,6 @@ public final class FileUtils
}
}
- public static void createHardLink(String from, String to)
- {
- createHardLink(new File(from), new File(to));
- }
-
- public static void createHardLink(File from, File to)
- {
- if (to.exists())
- throw new RuntimeException("Tried to create duplicate hard link to " + to);
- if (!from.exists())
- throw new RuntimeException("Tried to hard link to file that does not exist " + from);
-
- try
- {
- Files.createLink(to.toPath(), from.toPath());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, to);
- }
- }
-
private static final File tempDir = new File(JAVA_IO_TMPDIR.getString());
private static final AtomicLong tempFileNum = new AtomicLong();
@@ -193,6 +171,62 @@ public final class FileUtils
return f;
}
+ public static void createHardLink(String from, String to)
+ {
+ createHardLink(new File(from), new File(to));
+ }
+
+ public static void createHardLink(File from, File to)
+ {
+ if (to.exists())
+ throw new RuntimeException("Tried to create duplicate hard link to " + to);
+ if (!from.exists())
+ throw new RuntimeException("Tried to hard link to file that does not exist " + from);
+
+ try
+ {
+ Files.createLink(to.toPath(), from.toPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, to);
+ }
+ }
+
+ public static void createHardLinkWithConfirm(File from, File to)
+ {
+ try
+ {
+ createHardLink(from, to);
+ }
+ catch (FSWriteError ex)
+ {
+ throw ex;
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(String.format("Unable to hardlink from %s to %s", from, to), t);
+ }
+ }
+
+ public static void createHardLinkWithConfirm(String from, String to)
+ {
+ createHardLinkWithConfirm(new File(from), new File(to));
+ }
+
+ public static void createHardLinkWithoutConfirm(String from, String to)
+ {
+ try
+ {
+ createHardLink(new File(from), new File(to));
+ }
+ catch (FSWriteError fse)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Could not hardlink file " + from + " to " + to, fse);
+ }
+ }
+
public static Throwable deleteWithConfirm(String filePath, Throwable accumulate)
{
return deleteWithConfirm(new File(filePath), accumulate, null);
@@ -244,6 +278,40 @@ public final class FileUtils
maybeFail(deleteWithConfirm(file, null, rateLimiter));
}
+ public static void copyWithOutConfirm(String from, String to)
+ {
+ try
+ {
+ Files.copy(Paths.get(from), Paths.get(to));
+ }
+ catch (IOException e)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Could not copy file" + from + " to " + to, e);
+ }
+ }
+
+ public static void copyWithConfirm(String from, String to)
+ {
+ copyWithConfirm(new File(from), new File(to));
+ }
+
+ public static void copyWithConfirm(File from, File to)
+ {
+ assert from.exists();
+ if (logger.isTraceEnabled())
+ logger.trace("Copying {} to {}", from.getPath(), to.getPath());
+
+ try
+ {
+ Files.copy(from.toPath(), to.toPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, "Could not copy file" + from + " to " + to);
+ }
+ }
+
public static void renameWithOutConfirm(String from, String to)
{
try
@@ -298,6 +366,7 @@ public final class FileUtils
}
}
+
public static void truncate(String path, long size)
{
try(FileChannel channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE))
@@ -899,7 +968,7 @@ public final class FileUtils
* signed long (2^63-1), if the filesystem is any bigger, then the size overflows. {@code SafeFileStore} will
* return {@code Long.MAX_VALUE} if the size overflow.</p>
*
- * @see https://bugs.openjdk.java.net/browse/JDK-8162520.
+ * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8162520">JDK-8162520</a>.
*/
private static final class SafeFileStore extends FileStore
{
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 90990f1..605edba 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1216,9 +1216,9 @@ public class NodeProbe implements AutoCloseable
ssProxy.loadNewSSTables(ksName, cfName);
}
- public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+ public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
{
- return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify);
+ return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData);
}
public void rebuildIndex(String ksName, String cfName, String... idxNames)
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java b/src/java/org/apache/cassandra/tools/nodetool/Import.java
index 08fe35d..73fa314 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Import.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java
@@ -75,6 +75,11 @@ public class Import extends NodeToolCmd
description = "Run an extended verify, verifying all values in the new sstables")
private boolean extendedVerify = false;
+ @Option(title = "copy_data",
+ name = {"-p", "--copy-data"},
+ description = "Copy data from source directories instead of moving them")
+ private boolean copyData = false;
+
@Override
public void execute(NodeProbe probe)
{
@@ -89,7 +94,7 @@ public class Import extends NodeToolCmd
extendedVerify = false;
}
List<String> srcPaths = Lists.newArrayList(args.subList(2, args.size()));
- List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify);
+ List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify, copyData);
if (!failedDirs.isEmpty())
{
PrintStream err = probe.output().err;
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index 4094aa4..c0c3799 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -23,19 +23,16 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.cache.RowCacheKey;
@@ -43,8 +40,6 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.BootStrapper;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,20 +48,45 @@ import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ImportTest extends CQLTester
{
+
+ @Test
+ public void basicImportByMovingTest() throws Throwable
+ {
+ File backupDir = prepareBasicImporting();
+ // copy is false - so importing will be done by moving
+ importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(false).build(), 10);
+ // files were moved
+ Assert.assertEquals(0, countFiles(backupDir));
+ }
+
@Test
- public void basicImportTest() throws Throwable
+ public void basicImportByCopyingTest() throws Throwable
+ {
+ File backupDir = prepareBasicImporting();
+ // copy is true - so importing will be done by copying
+ importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(true).build(), 10);
+ // files are left there as they were just copied
+ Assert.assertNotEquals(0, countFiles(backupDir));
+ }
+
+ private File prepareBasicImporting() throws Throwable
{
createTable("create table %s (id int primary key, d int)");
+
+
for (int i = 0; i < 10; i++)
+ {
execute("insert into %s (id, d) values (?, ?)", i, i);
+ }
+
getCurrentColumnFamilyStore().forceBlockingFlush();
+
Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
getCurrentColumnFamilyStore().clearUnsafe();
@@ -74,11 +94,14 @@ public class ImportTest extends CQLTester
assertEquals(0, execute("select * from %s").size());
- SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).build();
- SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
- importer.importNewSSTables(options);
+ return backupdir;
+ }
- assertEquals(10, execute("select * from %s").size());
+ private List<String> importSSTables(SSTableImporter.Options options, int expectedRows) throws Throwable {
+ SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
+ List<String> failedDirectories = importer.importNewSSTables(options);
+ assertEquals(expectedRows, execute("select * from %s").size());
+ return failedDirectories;
}
@Test
@@ -276,7 +299,7 @@ public class ImportTest extends CQLTester
sstable.selfRef().release();
}
- private void testCorruptHelper(boolean verify) throws Throwable
+ private void testCorruptHelper(boolean verify, boolean copy) throws Throwable
{
createTable("create table %s (id int primary key, d int)");
for (int i = 0; i < 10; i++)
@@ -312,7 +335,7 @@ public class ImportTest extends CQLTester
// first we moved out 2 sstables, one correct and one corrupt in to a single directory (backupdir)
// then we moved out 1 sstable, a correct one (in backupdirCorrect).
// now import should fail import on backupdir, but import the one in backupdirCorrect.
- SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).verifySSTables(verify).build();
+ SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).copyData(copy).verifySSTables(verify).build();
SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
List<String> failedDirectories = importer.importNewSSTables(options);
assertEquals(Collections.singletonList(backupdir.toString()), failedDirectories);
@@ -324,7 +347,15 @@ public class ImportTest extends CQLTester
}
assertEquals("Data dir should contain one file", 1, countFiles(getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables()));
assertEquals("backupdir contained 2 files before import, should still contain 2 after failing to import it", beforeImport, Sets.newHashSet(backupdir.listFiles()));
- assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect));
+ if (copy)
+ {
+ assertEquals("backupdirCorrect contained 1 file before import, should contain 1 after import too", 1, countFiles(backupdirCorrect));
+ }
+ else
+ {
+ assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect));
+ }
+
}
private int countFiles(File dir)
@@ -344,13 +375,25 @@ public class ImportTest extends CQLTester
@Test
public void testImportCorrupt() throws Throwable
{
- testCorruptHelper(true);
+ testCorruptHelper(true, false);
+ }
+
+ @Test
+ public void testImportCorruptWithCopying() throws Throwable
+ {
+ testCorruptHelper(true, true);
}
@Test
public void testImportCorruptWithoutValidation() throws Throwable
{
- testCorruptHelper(false);
+ testCorruptHelper(false, false);
+ }
+
+ @Test
+ public void testImportCorruptWithoutValidationWithCopying() throws Throwable
+ {
+ testCorruptHelper(false, true);
}
@Test
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index c8bdd09..0b64028 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -707,7 +707,7 @@ public class SSTableReaderTest
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
SSTableReader sstable = getNewSSTable(cfs);
Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0);
- SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components);
+ SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false);
}
@Test(expected = RuntimeException.class)
@@ -717,7 +717,7 @@ public class SSTableReaderTest
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
SSTableReader sstable = getNewSSTable(cfs);
Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0);
- SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components);
+ SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components, false);
}
@Test
@@ -738,7 +738,7 @@ public class SSTableReaderTest
assertFalse(f.exists());
assertTrue(new File(sstable.descriptor.filenameFor(c)).exists());
}
- SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components);
+ SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false);
// make sure the files were moved:
for (Component c : sstable.components)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org