You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/03/05 10:07:47 UTC

[cassandra] 01/02: Add possibility to copy SSTables in SSTableImporter instead of moving them

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 309b3033d44c5cdc18d6e3897661966853d39407
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Mon Feb 15 16:23:10 2021 +0100

    Add possibility to copy SSTables in SSTableImporter instead of moving them
    
    Patch by Stefan Miklosovic; reviewed by Michael Semb Wever and Marcus Eriksson for CASSANDRA-16407
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  10 +-
 .../cassandra/db/ColumnFamilyStoreMBean.java       |  27 ++++-
 .../org/apache/cassandra/db/SSTableImporter.java   |  47 +++++++--
 .../cassandra/io/sstable/format/SSTableReader.java |  23 ++++-
 .../cassandra/io/sstable/format/SSTableWriter.java |  27 +++++
 .../org/apache/cassandra/io/util/FileUtils.java    | 115 ++++++++++++++++-----
 src/java/org/apache/cassandra/tools/NodeProbe.java |   4 +-
 .../apache/cassandra/tools/nodetool/Import.java    |   7 +-
 test/unit/org/apache/cassandra/db/ImportTest.java  |  77 +++++++++++---
 .../cassandra/io/sstable/SSTableReaderTest.java    |   6 +-
 11 files changed, 284 insertions(+), 60 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 839302b..24163c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Add possibility to copy SSTables in SSTableImporter instead of moving them (CASSANDRA-16407)
  * Fix DESCRIBE statement for CUSTOM indices with options (CASSANDRA-16482)
  * Fix cassandra-stress JMX connection (CASSANDRA-16473)
  * Avoid processing redundant application states on endpoint changes (CASSANDRA-16381)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1089e13..00aebc4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -709,7 +709,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * #{@inheritDoc}
      */
-    public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+    public synchronized List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
     {
         SSTableImporter.Options options = SSTableImporter.Options.options(srcPaths)
                                                                  .resetLevel(resetLevel)
@@ -717,11 +717,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                  .verifySSTables(verifySSTables)
                                                                  .verifyTokens(verifyTokens)
                                                                  .invalidateCaches(invalidateCaches)
-                                                                 .extendedVerify(extendedVerify).build();
+                                                                 .extendedVerify(extendedVerify)
+                                                                 .copyData(copyData).build();
 
         return sstableImporter.importNewSSTables(options);
     }
 
+    public List<String> importNewSSTables(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+    {
+        return importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, false);
+    }
+
     Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory)
     {
         Descriptor newDescriptor;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index c2cf393..23757ba 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -152,18 +152,41 @@ public interface ColumnFamilyStoreMBean
      * @param verifySSTables if the new sstables should be verified that they are not corrupt
      * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node
      * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables
-     * @param jbodCheck if the new sstables should be placed 'optimally' - count tokens and move the sstable to the directory where it has the most keys
      * @param extendedVerify if we should run an extended verify checking all values in the new sstables
      *
      * @return list of failed import directories
      */
+    @Deprecated
+    public List<String> importNewSSTables(Set<String> srcPaths,
+                                           boolean resetLevel,
+                                           boolean clearRepaired,
+                                           boolean verifySSTables,
+                                           boolean verifyTokens,
+                                           boolean invalidateCaches,
+                                           boolean extendedVerify);
+
+    /**
+     * Load new sstables from the given directory
+     *
+     * @param srcPaths the path to the new sstables - if it is an empty set, the data directories will be scanned
+     * @param resetLevel if the level should be reset to 0 on the new sstables
+     * @param clearRepaired if repaired info should be wiped from the new sstables
+     * @param verifySSTables if the new sstables should be verified that they are not corrupt
+     * @param verifyTokens if the tokens in the new sstables should be verified that they are owned by the current node
+     * @param invalidateCaches if row cache should be invalidated for the keys in the new sstables
+     * @param extendedVerify if we should run an extended verify checking all values in the new sstables
+     * @param copyData if we should copy data from source paths instead of moving them
+     *
+     * @return list of failed import directories
+     */
     public List<String> importNewSSTables(Set<String> srcPaths,
                                           boolean resetLevel,
                                           boolean clearRepaired,
                                           boolean verifySSTables,
                                           boolean verifyTokens,
                                           boolean invalidateCaches,
-                                          boolean extendedVerify);
+                                          boolean extendedVerify,
+                                          boolean copyData);
 
     @Deprecated
     public void loadNewSSTables();
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java
index 7597f82..989ff12 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -40,9 +39,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -139,7 +136,7 @@ public class SSTableImporter
                     Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
                     maybeMutateMetadata(entry.getKey(), options);
                     movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
-                    SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue());
+                    SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
                     newSSTablesPerDirectory.add(sstable);
                 }
                 catch (Throwable t)
@@ -149,7 +146,14 @@ public class SSTableImporter
                     {
                         logger.error("Failed importing sstables in directory {}", dir, t);
                         failedDirectories.add(dir);
-                        moveSSTablesBack(movedSSTables);
+                        if (options.copyData)
+                        {
+                            removeCopiedSSTables(movedSSTables);
+                        }
+                        else
+                        {
+                            moveSSTablesBack(movedSSTables);
+                        }
                         movedSSTables.clear();
                         newSSTablesPerDirectory.clear();
                         break;
@@ -285,6 +289,25 @@ public class SSTableImporter
     }
 
     /**
+     * Similarly for moving case, we need to delete all SSTables which were copied already but the
+     * copying as a whole has failed so we do not leave any traces behind such failed import.
+     *
+     * @param movedSSTables tables we have moved already (by copying) which need to be removed
+     */
+    private void removeCopiedSSTables(Set<MovedSSTable> movedSSTables)
+    {
+        logger.debug("Removing copied SSTables which were left in data directories after failed SSTable import.");
+        for (MovedSSTable movedSSTable : movedSSTables)
+        {
+            if (new File(movedSSTable.newDescriptor.filenameFor(Component.DATA)).exists())
+            {
+                // no logging here as for moveSSTablesBack case above as logging is done in delete method
+                SSTableWriter.delete(movedSSTable.newDescriptor, movedSSTable.components);
+            }
+        }
+    }
+
+    /**
      * Iterates over all keys in the sstable index and invalidates the row cache
      */
     @VisibleForTesting
@@ -365,8 +388,9 @@ public class SSTableImporter
         private final boolean verifyTokens;
         private final boolean invalidateCaches;
         private final boolean extendedVerify;
+        private final boolean copyData;
 
-        public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+        public Options(Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
         {
             this.srcPaths = srcPaths;
             this.resetLevel = resetLevel;
@@ -375,6 +399,7 @@ public class SSTableImporter
             this.verifyTokens = verifyTokens;
             this.invalidateCaches = invalidateCaches;
             this.extendedVerify = extendedVerify;
+            this.copyData = copyData;
         }
 
         public static Builder options(String srcDir)
@@ -403,6 +428,7 @@ public class SSTableImporter
                    ", verifyTokens=" + verifyTokens +
                    ", invalidateCaches=" + invalidateCaches +
                    ", extendedVerify=" + extendedVerify +
+                   ", copyData= " + copyData +
                    '}';
         }
 
@@ -415,6 +441,7 @@ public class SSTableImporter
             private boolean verifyTokens = false;
             private boolean invalidateCaches = false;
             private boolean extendedVerify = false;
+            private boolean copyData = false;
 
             private Builder(Set<String> srcPath)
             {
@@ -458,9 +485,15 @@ public class SSTableImporter
                 return this;
             }
 
+            public Builder copyData(boolean value)
+            {
+                copyData = value;
+                return this;
+            }
+
             public Options build()
             {
-                return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify);
+                return new Options(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData);
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 3e14422..652d9c0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.UnknownColumnException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.*;
@@ -2254,7 +2255,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      *
      * All components given will be moved/renamed
      */
-    public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components)
+    public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData)
     {
         if (!oldDescriptor.isCompatible())
             throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
@@ -2276,8 +2277,24 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             throw new RuntimeException(msg);
         }
 
-        logger.info("Renaming new SSTable {} to {}", oldDescriptor, newDescriptor);
-        SSTableWriter.rename(oldDescriptor, newDescriptor, components);
+        if (copyData)
+        {
+            try
+            {
+                logger.info("Hardlinking new SSTable {} to {}", oldDescriptor, newDescriptor);
+                SSTableWriter.hardlink(oldDescriptor, newDescriptor, components);
+            }
+            catch (FSWriteError ex)
+            {
+                logger.warn("Unable to hardlink new SSTable {} to {}, falling back to copying", oldDescriptor, newDescriptor, ex);
+                SSTableWriter.copy(oldDescriptor, newDescriptor, components);
+            }
+        }
+        else
+        {
+            logger.info("Moving new SSTable {} to {}", oldDescriptor, newDescriptor);
+            SSTableWriter.rename(oldDescriptor, newDescriptor, components);
+        }
 
         SSTableReader reader;
         try
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 1dbfcdb..cce5378 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -344,6 +344,33 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
     }
 
+    public static void copy(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
+    {
+        for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
+        {
+            FileUtils.copyWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+        }
+
+        // do -Data last because -Data present should mean the sstable was completely copied before crash
+        FileUtils.copyWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+        // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+        FileUtils.copyWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
+    }
+
+    public static void hardlink(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
+    {
+        for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
+        {
+            FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
+        }
+
+        // do -Data last because -Data present should mean the sstable was completely copied before crash
+        FileUtils.createHardLinkWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
+
+        // copy it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
+        FileUtils.createHardLinkWithoutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
+    }
 
     public static abstract class Factory
     {
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index e0ea436..7798bd7 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -114,28 +114,6 @@ public final class FileUtils
         }
     }
 
-    public static void createHardLink(String from, String to)
-    {
-        createHardLink(new File(from), new File(to));
-    }
-
-    public static void createHardLink(File from, File to)
-    {
-        if (to.exists())
-            throw new RuntimeException("Tried to create duplicate hard link to " + to);
-        if (!from.exists())
-            throw new RuntimeException("Tried to hard link to file that does not exist " + from);
-
-        try
-        {
-            Files.createLink(to.toPath(), from.toPath());
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, to);
-        }
-    }
-
     private static final File tempDir = new File(JAVA_IO_TMPDIR.getString());
     private static final AtomicLong tempFileNum = new AtomicLong();
 
@@ -193,6 +171,62 @@ public final class FileUtils
         return f;
     }
 
+    public static void createHardLink(String from, String to)
+    {
+        createHardLink(new File(from), new File(to));
+    }
+
+    public static void createHardLink(File from, File to)
+    {
+        if (to.exists())
+            throw new RuntimeException("Tried to create duplicate hard link to " + to);
+        if (!from.exists())
+            throw new RuntimeException("Tried to hard link to file that does not exist " + from);
+
+        try
+        {
+            Files.createLink(to.toPath(), from.toPath());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, to);
+        }
+    }
+
+    public static void createHardLinkWithConfirm(File from, File to)
+    {
+        try
+        {
+            createHardLink(from, to);
+        }
+        catch (FSWriteError ex)
+        {
+            throw ex;
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException(String.format("Unable to hardlink from %s to %s", from, to), t);
+        }
+    }
+
+    public static void createHardLinkWithConfirm(String from, String to)
+    {
+        createHardLinkWithConfirm(new File(from), new File(to));
+    }
+
+    public static void createHardLinkWithoutConfirm(String from, String to)
+    {
+        try
+        {
+            createHardLink(new File(from), new File(to));
+        }
+        catch (FSWriteError fse)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Could not hardlink file " + from + " to " + to, fse);
+        }
+    }
+
     public static Throwable deleteWithConfirm(String filePath, Throwable accumulate)
     {
         return deleteWithConfirm(new File(filePath), accumulate, null);
@@ -244,6 +278,40 @@ public final class FileUtils
         maybeFail(deleteWithConfirm(file, null, rateLimiter));
     }
 
+    public static void copyWithOutConfirm(String from, String to)
+    {
+        try
+        {
+            Files.copy(Paths.get(from), Paths.get(to));
+        }
+        catch (IOException e)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Could not copy file" + from + " to " + to, e);
+        }
+    }
+
+    public static void copyWithConfirm(String from, String to)
+    {
+        copyWithConfirm(new File(from), new File(to));
+    }
+
+    public static void copyWithConfirm(File from, File to)
+    {
+        assert from.exists();
+        if (logger.isTraceEnabled())
+            logger.trace("Copying {} to {}", from.getPath(), to.getPath());
+
+        try
+        {
+            Files.copy(from.toPath(), to.toPath());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, "Could not copy file" + from + " to " + to);
+        }
+    }
+
     public static void renameWithOutConfirm(String from, String to)
     {
         try
@@ -298,6 +366,7 @@ public final class FileUtils
         }
 
     }
+
     public static void truncate(String path, long size)
     {
         try(FileChannel channel = FileChannel.open(Paths.get(path), StandardOpenOption.READ, StandardOpenOption.WRITE))
@@ -899,7 +968,7 @@ public final class FileUtils
      * signed long (2^63-1), if the filesystem is any bigger, then the size overflows. {@code SafeFileStore} will
      * return {@code Long.MAX_VALUE} if the size overflow.</p>
      *
-     * @see https://bugs.openjdk.java.net/browse/JDK-8162520.
+     * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8162520">JDK-8162520</a>.
      */
     private static final class SafeFileStore extends FileStore
     {
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 90990f1..605edba 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1216,9 +1216,9 @@ public class NodeProbe implements AutoCloseable
         ssProxy.loadNewSSTables(ksName, cfName);
     }
 
-    public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify)
+    public List<String> importNewSSTables(String ksName, String cfName, Set<String> srcPaths, boolean resetLevel, boolean clearRepaired, boolean verifySSTables, boolean verifyTokens, boolean invalidateCaches, boolean extendedVerify, boolean copyData)
     {
-        return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify);
+        return getCfsProxy(ksName, cfName).importNewSSTables(srcPaths, resetLevel, clearRepaired, verifySSTables, verifyTokens, invalidateCaches, extendedVerify, copyData);
     }
 
     public void rebuildIndex(String ksName, String cfName, String... idxNames)
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Import.java b/src/java/org/apache/cassandra/tools/nodetool/Import.java
index 08fe35d..73fa314 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Import.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Import.java
@@ -75,6 +75,11 @@ public class Import extends NodeToolCmd
             description = "Run an extended verify, verifying all values in the new sstables")
     private boolean extendedVerify = false;
 
+    @Option(title = "copy_data",
+            name = {"-p", "--copy-data"},
+            description = "Copy data from source directories instead of moving them")
+    private boolean copyData = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -89,7 +94,7 @@ public class Import extends NodeToolCmd
             extendedVerify = false;
         }
         List<String> srcPaths = Lists.newArrayList(args.subList(2, args.size()));
-        List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify);
+        List<String> failedDirs = probe.importNewSSTables(args.get(0), args.get(1), new HashSet<>(srcPaths), !keepLevel, !keepRepaired, !noVerify, !noVerifyTokens, !noInvalidateCaches, extendedVerify, copyData);
         if (!failedDirs.isEmpty())
         {
             PrintStream err = probe.output().err;
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index 4094aa4..c0c3799 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -23,19 +23,16 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.cache.RowCacheKey;
@@ -43,8 +40,6 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.BootStrapper;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,20 +48,45 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ImportTest extends CQLTester
 {
+
+    @Test
+    public void basicImportByMovingTest() throws Throwable
+    {
+        File backupDir = prepareBasicImporting();
+        // copy is false - so importing will be done by moving
+        importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(false).build(), 10);
+        // files were moved
+        Assert.assertEquals(0, countFiles(backupDir));
+    }
+
     @Test
-    public void basicImportTest() throws Throwable
+    public void basicImportByCopyingTest() throws Throwable
+    {
+        File backupDir = prepareBasicImporting();
+        // copy is true - so importing will be done by copying
+        importSSTables(SSTableImporter.Options.options(backupDir.toString()).copyData(true).build(), 10);
+        // files are left there as they were just copied
+        Assert.assertNotEquals(0, countFiles(backupDir));
+    }
+
+    private File prepareBasicImporting() throws Throwable
     {
         createTable("create table %s (id int primary key, d int)");
+
+
         for (int i = 0; i < 10; i++)
+        {
             execute("insert into %s (id, d) values (?, ?)", i, i);
+        }
+
         getCurrentColumnFamilyStore().forceBlockingFlush();
+
         Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
         getCurrentColumnFamilyStore().clearUnsafe();
 
@@ -74,11 +94,14 @@ public class ImportTest extends CQLTester
 
         assertEquals(0, execute("select * from %s").size());
 
-        SSTableImporter.Options options = SSTableImporter.Options.options(backupdir.toString()).build();
-        SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
-        importer.importNewSSTables(options);
+        return backupdir;
+    }
 
-        assertEquals(10, execute("select * from %s").size());
+    private List<String> importSSTables(SSTableImporter.Options options, int expectedRows) throws Throwable {
+        SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
+        List<String> failedDirectories = importer.importNewSSTables(options);
+        assertEquals(expectedRows, execute("select * from %s").size());
+        return failedDirectories;
     }
 
     @Test
@@ -276,7 +299,7 @@ public class ImportTest extends CQLTester
             sstable.selfRef().release();
     }
 
-    private void testCorruptHelper(boolean verify) throws Throwable
+    private void testCorruptHelper(boolean verify, boolean copy) throws Throwable
     {
         createTable("create table %s (id int primary key, d int)");
         for (int i = 0; i < 10; i++)
@@ -312,7 +335,7 @@ public class ImportTest extends CQLTester
         // first we moved out 2 sstables, one correct and one corrupt in to a single directory (backupdir)
         // then we moved out 1 sstable, a correct one (in backupdirCorrect).
         // now import should fail import on backupdir, but import the one in backupdirCorrect.
-        SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).verifySSTables(verify).build();
+        SSTableImporter.Options options = SSTableImporter.Options.options(Sets.newHashSet(backupdir.toString(), backupdirCorrect.toString())).copyData(copy).verifySSTables(verify).build();
         SSTableImporter importer = new SSTableImporter(getCurrentColumnFamilyStore());
         List<String> failedDirectories = importer.importNewSSTables(options);
         assertEquals(Collections.singletonList(backupdir.toString()), failedDirectories);
@@ -324,7 +347,15 @@ public class ImportTest extends CQLTester
         }
         assertEquals("Data dir should contain one file", 1, countFiles(getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables()));
         assertEquals("backupdir contained 2 files before import, should still contain 2 after failing to import it", beforeImport, Sets.newHashSet(backupdir.listFiles()));
-        assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect));
+        if (copy)
+        {
+            assertEquals("backupdirCorrect contained 1 file before import, should contain 1 after import too", 1, countFiles(backupdirCorrect));
+        }
+        else
+        {
+            assertEquals("backupdirCorrect contained 1 file before import, should be empty after import", 0, countFiles(backupdirCorrect));
+        }
+
     }
 
     private int countFiles(File dir)
@@ -344,13 +375,25 @@ public class ImportTest extends CQLTester
     @Test
     public void testImportCorrupt() throws Throwable
     {
-        testCorruptHelper(true);
+        testCorruptHelper(true, false);
+    }
+
+    @Test
+    public void testImportCorruptWithCopying() throws Throwable
+    {
+        testCorruptHelper(true, true);
     }
 
     @Test
     public void testImportCorruptWithoutValidation() throws Throwable
     {
-        testCorruptHelper(false);
+        testCorruptHelper(false, false);
+    }
+
+    @Test
+    public void testImportCorruptWithoutValidationWithCopying() throws Throwable
+    {
+        testCorruptHelper(false, true);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index c8bdd09..0b64028 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -707,7 +707,7 @@ public class SSTableReaderTest
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
         SSTableReader sstable = getNewSSTable(cfs);
         Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0);
-        SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components);
+        SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false);
     }
 
     @Test(expected = RuntimeException.class)
@@ -717,7 +717,7 @@ public class SSTableReaderTest
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
         SSTableReader sstable = getNewSSTable(cfs);
         Descriptor notLiveDesc = new Descriptor(new File("/tmp"), "", "", 0);
-        SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components);
+        SSTableReader.moveAndOpenSSTable(cfs, notLiveDesc, sstable.descriptor, sstable.components, false);
     }
 
     @Test
@@ -738,7 +738,7 @@ public class SSTableReaderTest
             assertFalse(f.exists());
             assertTrue(new File(sstable.descriptor.filenameFor(c)).exists());
         }
-        SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components);
+        SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, notLiveDesc, sstable.components, false);
         // make sure the files were moved:
         for (Component c : sstable.components)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org