You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sm...@apache.org on 2022/07/01 07:01:45 UTC

[cassandra] branch trunk updated: Allow sstableloader to specify table without relying on path

This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 33a9093c5c Allow sstableloader to specify table without relying on path
33a9093c5c is described below

commit 33a9093c5cc2f8fcf913d1931415b697e52ec108
Author: Eduard Tudenhoefner <ed...@datastax.com>
AuthorDate: Fri Jul 1 08:35:04 2022 +0200

    Allow sstableloader to specify table without relying on path
    
    patch by Eduard Tudenhoefner; reviewed by Stefan Miklosovic, Brandon Williams for CASSANDRA-16584
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/io/sstable/Descriptor.java    | 89 +++++++++++++++++-----
 .../org/apache/cassandra/io/sstable/SSTable.java   | 22 ++++++
 .../apache/cassandra/io/sstable/SSTableLoader.java | 18 ++++-
 .../org/apache/cassandra/tools/BulkLoader.java     |  4 +-
 .../org/apache/cassandra/tools/LoaderOptions.java  | 27 ++++++-
 .../cassandra/io/sstable/SSTableLoaderTest.java    | 86 +++++++++++++--------
 7 files changed, 191 insertions(+), 56 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 99f64b06a6..e5cbc8ea68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Allow sstableloader to specify table without relying on path (CASSANDRA-16584)
  * Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700)
  * Add ByteComparable API (CASSANDRA-6936)
  * Add guardrail for maximum replication factor (CASSANDRA-17500)
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 83bafd4ff4..589e46b015 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -218,6 +218,7 @@ public class Descriptor
 
     /**
      * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
+     * The keyspace/table name will be extracted from the directory path.
      *
      * @param file the {@code File} object for the filename to parse.
      * @return a pair of the descriptor and component corresponding to the provided {@code file}.
@@ -233,6 +234,58 @@ public class Descriptor
         if (!file.isAbsolute())
             file = file.toAbsolute();
 
+        SSTableInfo info = validateAndExtractInfo(file);
+        String name = file.name();
+
+        File directory = parentOf(name, file);
+        File tableDir = directory;
+
+        // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
+        String indexName = "";
+        if (tableDir.name().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
+        {
+            indexName = tableDir.name();
+            tableDir = parentOf(name, tableDir);
+        }
+
+        // Then it can be a backup or a snapshot
+        if (tableDir.name().equals(Directories.BACKUPS_SUBDIR))
+            tableDir = tableDir.parent();
+        else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR))
+            tableDir = parentOf(name, parentOf(name, tableDir));
+
+        String table = tableDir.name().split("-")[0] + indexName;
+        String keyspace = parentOf(name, tableDir).name();
+
+        return Pair.create(new Descriptor(info.version, directory, keyspace, table, info.id, info.format), info.component);
+    }
+
+    /**
+     * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
+     *
+     * @param file     the {@code File} object for the filename to parse.
+     * @param keyspace The keyspace name of the file. If <code>null</code>, then the keyspace name will be extracted
+     *                 from the directory path.
+     * @param table    The table name of the file. If <code>null</code>, then the table name will be extracted from the
+     *                 directory path.
+     * @return a pair of the descriptor and component corresponding to the provided {@code file}.
+     * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+     *                                  mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+     *                                  versions.
+     */
+    public static Pair<Descriptor, Component> fromFilenameWithComponent(File file, String keyspace, String table)
+    {
+        if (null == keyspace || null == table)
+        {
+            return fromFilenameWithComponent(file);
+        }
+
+        SSTableInfo info = validateAndExtractInfo(file);
+        return Pair.create(new Descriptor(info.version, parentOf(file.name(), file), keyspace, table, info.id, info.format), info.component);
+    }
+
+    private static SSTableInfo validateAndExtractInfo(File file)
+    {
         String name = file.name();
         List<String> tokens = filenameSplitter.splitToList(name);
         int size = tokens.size();
@@ -245,9 +298,7 @@ public class Descriptor
             // Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect
             // but we're just trying to be helpful, not perfect.
             if (size == 5 || size == 6)
-                throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.",
-                                                                 name,
-                                                                 tokens.get(size - 3)));
+                throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.", name, tokens.get(size - 3)));
             throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name));
         }
 
@@ -282,27 +333,23 @@ public class Descriptor
         if (!version.isCompatible())
             throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString);
 
-        File directory = parentOf(name, file);
-        File tableDir = directory;
+        return new SSTableInfo(version, id, format, component);
+    }
 
-        // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
-        String indexName = "";
-        if (Directories.isSecondaryIndexFolder(tableDir))
+    private static class SSTableInfo
+    {
+        final Version version;
+        final SSTableId id;
+        final SSTableFormat.Type format;
+        final Component component;
+
+        SSTableInfo(Version version, SSTableId id, SSTableFormat.Type format, Component component)
         {
-            indexName = tableDir.name();
-            tableDir = parentOf(name, tableDir);
+            this.version = version;
+            this.id = id;
+            this.format = format;
+            this.component = component;
         }
-
-        // Then it can be a backup or a snapshot
-        if (tableDir.name().equals(Directories.BACKUPS_SUBDIR))
-            tableDir = tableDir.parent();
-        else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR))
-            tableDir = parentOf(name, parentOf(name, tableDir));
-
-        String table = tableDir.name().split("-")[0] + indexName;
-        String keyspace = parentOf(name, tableDir).name();
-
-        return Pair.create(new Descriptor(version, directory, keyspace, table, id, format), component);
     }
 
     private static File parentOf(String name, File file)
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 488a7dc45a..6a691b1f0d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -216,6 +216,28 @@ public abstract class SSTable
         }
     }
 
+    /**
+     * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object.
+     *
+     * @param file     the filename to parse.
+     * @param keyspace The keyspace name of the file.
+     * @param table    The table name of the file.
+     * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to
+     * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be
+     * returned as CUSTOM ones.
+     */
+    public static Pair<Descriptor, Component> tryComponentFromFilename(File file, String keyspace, String table)
+    {
+        try
+        {
+            return Descriptor.fromFilenameWithComponent(file, keyspace, table);
+        }
+        catch (Throwable e)
+        {
+            return null;
+        }
+    }
+
     /**
      * Parse a sstable filename into a {@link Descriptor} object.
      * <p>
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3d9e0f4c17..71bd025db8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
 {
     private final File directory;
     private final String keyspace;
+    private final String table;
     private final Client client;
     private final int connectionsPerHost;
     private final OutputHandler outputHandler;
@@ -62,9 +63,15 @@ public class SSTableLoader implements StreamEventHandler
     }
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace)
+    {
+        this(directory, client, outputHandler, connectionsPerHost, targetKeyspace, null);
+    }
+
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace, String targetTable)
     {
         this.directory = directory;
         this.keyspace = targetKeyspace != null ? targetKeyspace : directory.parent().name();
+        this.table = targetTable;
         this.client = client;
         this.outputHandler = outputHandler;
         this.connectionsPerHost = connectionsPerHost;
@@ -87,7 +94,16 @@ public class SSTableLoader implements StreamEventHandler
                                               return false;
                                           }
 
-                                          Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file);
+                                          Pair<Descriptor, Component> p;
+                                          if (null != keyspace && null != table)
+                                          {
+                                              p = SSTable.tryComponentFromFilename(file, keyspace, table);
+                                          }
+                                          else
+                                          {
+                                              p = SSTable.tryComponentFromFilename(file);
+                                          }
+
                                           Descriptor desc = p == null ? null : p.left;
                                           if (p == null || !p.right.equals(Component.DATA))
                                               return false;
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 811df7ab97..a3a296b97a 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -66,7 +66,8 @@ public class BulkLoader
                         buildSSLOptions(options.clientEncOptions)),
                         handler,
                         options.connectionsPerHost,
-                        options.targetKeyspace);
+                        options.targetKeyspace,
+                        options.targetTable);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
         StreamResultFuture future = null;
@@ -82,7 +83,6 @@ public class BulkLoader
             {
                 future = loader.stream(options.ignores, indicator);
             }
-
         }
         catch (Exception e)
         {
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index d882e5a853..27d54a7414 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -66,6 +66,7 @@ public class LoaderOptions
     public static final String ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS = "entire-sstable-inter-dc-throttle";
     public static final String TOOL_NAME = "sstableloader";
     public static final String TARGET_KEYSPACE = "target-keyspace";
+    public static final String TARGET_TABLE = "target-table";
 
     /* client encryption options */
     public static final String SSL_TRUSTSTORE = "truststore";
@@ -97,6 +98,7 @@ public class LoaderOptions
     public final Set<InetSocketAddress> hosts;
     public final Set<InetAddressAndPort> ignores;
     public final String targetKeyspace;
+    public final String targetTable;
 
     LoaderOptions(Builder builder)
     {
@@ -120,6 +122,7 @@ public class LoaderOptions
         hosts = builder.hosts;
         ignores = builder.ignores;
         targetKeyspace = builder.targetKeyspace;
+        targetTable = builder.targetTable;
     }
 
     static class Builder
@@ -147,6 +150,7 @@ public class LoaderOptions
         Set<InetSocketAddress> hosts = new HashSet<>();
         Set<InetAddressAndPort> ignores = new HashSet<>();
         String targetKeyspace;
+        String targetTable;
 
         Builder()
         {
@@ -328,6 +332,18 @@ public class LoaderOptions
             return this;
         }
 
+        public Builder targetKeyspace(String keyspace)
+        {
+            this.targetKeyspace = keyspace;
+            return this;
+        }
+
+        public Builder targetTable(String table)
+        {
+            this.targetKeyspace = table;
+            return this;
+        }
+
         public Builder parseArgs(String cmdArgs[])
         {
             CommandLineParser parser = new GnuParser();
@@ -566,10 +582,16 @@ public class LoaderOptions
                 {
                     targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE);
                     if (StringUtils.isBlank(targetKeyspace))
-                    {
                         errorMsg("Empty keyspace is not supported.", options);
-                    }
                 }
+
+                if (cmd.hasOption(TARGET_TABLE))
+                {
+                    targetTable = cmd.getOptionValue(TARGET_TABLE);
+                    if (StringUtils.isBlank(targetTable))
+                        errorMsg("Empty table is not supported.", options);
+                }
+
                 return this;
             }
             catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -678,6 +700,7 @@ public class LoaderOptions
         options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
         options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
         options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name");
+        options.addOption("tb", TARGET_TABLE, "target table name", "target table name");
         return options;
     }
 
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index c941a81db2..0af6d24a0a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 
 import com.google.common.io.Files;
 
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.io.util.File;
 import org.junit.After;
 import org.junit.Before;
@@ -40,7 +41,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamEvent;
@@ -93,16 +93,33 @@ public class SSTableLoaderTest
     @After
     public void cleanup()
     {
-        try {
-            FileUtils.deleteRecursive(tmpdir);
-        } catch (FSWriteError e) {
+        try
+        {
+            tmpdir.deleteRecursive();
+        }
+        catch (FSWriteError e)
+        {
             /*
               We force a GC here to force buffer deallocation, and then try deleting the directory again.
               For more information, see: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4715154
               If this is not the problem, the exception will be rethrown anyway.
              */
             System.gc();
-            FileUtils.deleteRecursive(tmpdir);
+            tmpdir.deleteRecursive();
+        }
+
+        try
+        {
+            for (String[] keyspaceTable : new String[][] { {KEYSPACE1, CF_STANDARD1},
+                                                           {KEYSPACE1, CF_STANDARD2},
+                                                           {KEYSPACE1, CF_BACKUPS},
+                                                           {KEYSPACE2, CF_STANDARD1},
+                                                           {KEYSPACE2, CF_STANDARD2}})
+            StorageService.instance.truncate(keyspaceTable[0], keyspaceTable[1]);
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException("Unable to truncate table!", ex);
         }
     }
 
@@ -150,9 +167,11 @@ public class SSTableLoaderTest
         assertEquals(1, partitions.size());
         assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
         assert metadata != null;
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                            .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
-                                                            .buffer());
+
+        Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+        assert row != null;
+
+        assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
 
         // The stream future is signalled when the work is complete but before releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).
@@ -168,7 +187,7 @@ public class SSTableLoaderTest
                                                   .inDirectory(dataDir)
                                                   .forTable(String.format(schema, KEYSPACE1, CF_STANDARD2))
                                                   .using(String.format(query, KEYSPACE1, CF_STANDARD2))
-                                                  .withBufferSizeInMB(1)
+                                                  .withBufferSizeInMiB(1)
                                                   .build();
 
         int NB_PARTITIONS = 5000; // Enough to write >1MiB and get at least one completed sstable before we've closed the writer
@@ -209,10 +228,9 @@ public class SSTableLoaderTest
     }
 
     @Test
-    public void testLoadingSSTableToDifferentKeyspace() throws Exception
+    public void testLoadingSSTableToDifferentKeyspaceAndTable() throws Exception
     {
-        File dataDir = new File(tmpdir.absolutePath() + File.pathSeparator() + KEYSPACE1 + File.pathSeparator() + CF_STANDARD1);
-        assert dataDir.tryCreateDirectories();
+        File dataDir = dataDir(CF_STANDARD1);
         TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1);
 
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
@@ -230,25 +248,31 @@ public class SSTableLoaderTest
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
         Util.flush(cfs); // wait for sstables to be on disk else we won't be able to stream them
 
-        final CountDownLatch latch = new CountDownLatch(1);
-        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2);
-        loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
+        for (String table : new String[] { CF_STANDARD2, null })
+        {
+            final CountDownLatch latch = new CountDownLatch(1);
+            SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2, table);
+            loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
-        cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
-        Util.flush(cfs);
+            String targetTable = table == null ? CF_STANDARD1 : table;
+            cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(targetTable);
+            Util.flush(cfs);
 
-        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
+            List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
 
-        assertEquals(1, partitions.size());
-        assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
-        assert metadata != null;
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                            .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
-                                                            .buffer());
+            assertEquals(1, partitions.size());
+            assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+            assert metadata != null;
 
-        // The stream future is signalled when the work is complete but before releasing references. Wait for release
-        // before cleanup (CASSANDRA-10118).
-        latch.await();
+            Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+            assert row != null;
+
+            assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
+
+            // The stream future is signalled when the work is complete but before releasing references. Wait for release
+            // before cleanup (CASSANDRA-10118).
+            latch.await();
+        }
     }
 
     @Test
@@ -278,9 +302,11 @@ public class SSTableLoaderTest
         assertEquals(1, partitions.size());
         assertEquals("key", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
         assert metadata != null;
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
-                                                            .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
-                                                            .buffer());
+
+        Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+        assert row != null;
+
+        assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
 
         // The stream future is signalled when the work is complete but before releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org