You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sm...@apache.org on 2022/07/01 07:01:45 UTC
[cassandra] branch trunk updated: Allow sstableloader to specify table without relying on path
This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 33a9093c5c Allow sstableloader to specify table without relying on path
33a9093c5c is described below
commit 33a9093c5cc2f8fcf913d1931415b697e52ec108
Author: Eduard Tudenhoefner <ed...@datastax.com>
AuthorDate: Fri Jul 1 08:35:04 2022 +0200
Allow sstableloader to specify table without relying on path
patch by Eduard Tudenhoefner; reviewed by Stefan Miklosovic, Brandon Williams for CASSANDRA-16584
---
CHANGES.txt | 1 +
.../apache/cassandra/io/sstable/Descriptor.java | 89 +++++++++++++++++-----
.../org/apache/cassandra/io/sstable/SSTable.java | 22 ++++++
.../apache/cassandra/io/sstable/SSTableLoader.java | 18 ++++-
.../org/apache/cassandra/tools/BulkLoader.java | 4 +-
.../org/apache/cassandra/tools/LoaderOptions.java | 27 ++++++-
.../cassandra/io/sstable/SSTableLoaderTest.java | 86 +++++++++++++--------
7 files changed, 191 insertions(+), 56 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 99f64b06a6..e5cbc8ea68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Allow sstableloader to specify table without relying on path (CASSANDRA-16584)
* Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700)
* Add ByteComparable API (CASSANDRA-6936)
* Add guardrail for maximum replication factor (CASSANDRA-17500)
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 83bafd4ff4..589e46b015 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -218,6 +218,7 @@ public class Descriptor
/**
* Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
+ * The keyspace/table name will be extracted from the directory path.
*
* @param file the {@code File} object for the filename to parse.
* @return a pair of the descriptor and component corresponding to the provided {@code file}.
@@ -233,6 +234,58 @@ public class Descriptor
if (!file.isAbsolute())
file = file.toAbsolute();
+ SSTableInfo info = validateAndExtractInfo(file);
+ String name = file.name();
+
+ File directory = parentOf(name, file);
+ File tableDir = directory;
+
+ // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
+ String indexName = "";
+ if (tableDir.name().startsWith(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
+ {
+ indexName = tableDir.name();
+ tableDir = parentOf(name, tableDir);
+ }
+
+ // Then it can be a backup or a snapshot
+ if (tableDir.name().equals(Directories.BACKUPS_SUBDIR))
+ tableDir = tableDir.parent();
+ else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR))
+ tableDir = parentOf(name, parentOf(name, tableDir));
+
+ String table = tableDir.name().split("-")[0] + indexName;
+ String keyspace = parentOf(name, tableDir).name();
+
+ return Pair.create(new Descriptor(info.version, directory, keyspace, table, info.id, info.format), info.component);
+ }
+
+ /**
+ * Parse a sstable filename, extracting both the {@code Descriptor} and {@code Component} part.
+ *
+ * @param file the {@code File} object for the filename to parse.
+ * @param keyspace The keyspace name of the file. If <code>null</code>, then the keyspace name will be extracted
+ * from the directory path.
+ * @param table The table name of the file. If <code>null</code>, then the table name will be extracted from the
+ * directory path.
+ * @return a pair of the descriptor and component corresponding to the provided {@code file}.
+ * @throws IllegalArgumentException if the provided {@code file} does point to a valid sstable filename. This could
+ * mean either that the filename doesn't look like a sstable file, or that it is for an old and unsupported
+ * versions.
+ */
+ public static Pair<Descriptor, Component> fromFilenameWithComponent(File file, String keyspace, String table)
+ {
+ if (null == keyspace || null == table)
+ {
+ return fromFilenameWithComponent(file);
+ }
+
+ SSTableInfo info = validateAndExtractInfo(file);
+ return Pair.create(new Descriptor(info.version, parentOf(file.name(), file), keyspace, table, info.id, info.format), info.component);
+ }
+
+ private static SSTableInfo validateAndExtractInfo(File file)
+ {
String name = file.name();
List<String> tokens = filenameSplitter.splitToList(name);
int size = tokens.size();
@@ -245,9 +298,7 @@ public class Descriptor
// Note that we assume it's an old format sstable if it has the right number of tokens: this is not perfect
// but we're just trying to be helpful, not perfect.
if (size == 5 || size == 6)
- throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.",
- name,
- tokens.get(size - 3)));
+ throw new IllegalArgumentException(String.format("%s is of version %s which is now unsupported and cannot be read.", name, tokens.get(size - 3)));
throw new IllegalArgumentException(String.format("Invalid sstable file %s: the name doesn't look like a supported sstable file name", name));
}
@@ -282,27 +333,23 @@ public class Descriptor
if (!version.isCompatible())
throw invalidSSTable(name, "incompatible sstable version (%s); you should have run upgradesstables before upgrading", versionString);
- File directory = parentOf(name, file);
- File tableDir = directory;
+ return new SSTableInfo(version, id, format, component);
+ }
- // Check if it's a 2ndary index directory (not that it doesn't exclude it to be also a backup or snapshot)
- String indexName = "";
- if (Directories.isSecondaryIndexFolder(tableDir))
+ private static class SSTableInfo
+ {
+ final Version version;
+ final SSTableId id;
+ final SSTableFormat.Type format;
+ final Component component;
+
+ SSTableInfo(Version version, SSTableId id, SSTableFormat.Type format, Component component)
{
- indexName = tableDir.name();
- tableDir = parentOf(name, tableDir);
+ this.version = version;
+ this.id = id;
+ this.format = format;
+ this.component = component;
}
-
- // Then it can be a backup or a snapshot
- if (tableDir.name().equals(Directories.BACKUPS_SUBDIR))
- tableDir = tableDir.parent();
- else if (parentOf(name, tableDir).name().equals(Directories.SNAPSHOT_SUBDIR))
- tableDir = parentOf(name, parentOf(name, tableDir));
-
- String table = tableDir.name().split("-")[0] + indexName;
- String keyspace = parentOf(name, tableDir).name();
-
- return Pair.create(new Descriptor(version, directory, keyspace, table, id, format), component);
}
private static File parentOf(String name, File file)
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 488a7dc45a..6a691b1f0d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -216,6 +216,28 @@ public abstract class SSTable
}
}
+ /**
+ * Parse a sstable filename into both a {@link Descriptor} and {@code Component} object.
+ *
+ * @param file the filename to parse.
+ * @param keyspace The keyspace name of the file.
+ * @param table The table name of the file.
+ * @return a pair of the {@code Descriptor} and {@code Component} corresponding to {@code file} if it corresponds to
+ * a valid and supported sstable filename, {@code null} otherwise. Note that components of an unknown type will be
+ * returned as CUSTOM ones.
+ */
+ public static Pair<Descriptor, Component> tryComponentFromFilename(File file, String keyspace, String table)
+ {
+ try
+ {
+ return Descriptor.fromFilenameWithComponent(file, keyspace, table);
+ }
+ catch (Throwable e)
+ {
+ return null;
+ }
+ }
+
/**
* Parse a sstable filename into a {@link Descriptor} object.
* <p>
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 3d9e0f4c17..71bd025db8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
{
private final File directory;
private final String keyspace;
+ private final String table;
private final Client client;
private final int connectionsPerHost;
private final OutputHandler outputHandler;
@@ -62,9 +63,15 @@ public class SSTableLoader implements StreamEventHandler
}
public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace)
+ {
+ this(directory, client, outputHandler, connectionsPerHost, targetKeyspace, null);
+ }
+
+ public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace, String targetTable)
{
this.directory = directory;
this.keyspace = targetKeyspace != null ? targetKeyspace : directory.parent().name();
+ this.table = targetTable;
this.client = client;
this.outputHandler = outputHandler;
this.connectionsPerHost = connectionsPerHost;
@@ -87,7 +94,16 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
- Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(file);
+ Pair<Descriptor, Component> p;
+ if (null != keyspace && null != table)
+ {
+ p = SSTable.tryComponentFromFilename(file, keyspace, table);
+ }
+ else
+ {
+ p = SSTable.tryComponentFromFilename(file);
+ }
+
Descriptor desc = p == null ? null : p.left;
if (p == null || !p.right.equals(Component.DATA))
return false;
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 811df7ab97..a3a296b97a 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -66,7 +66,8 @@ public class BulkLoader
buildSSLOptions(options.clientEncOptions)),
handler,
options.connectionsPerHost,
- options.targetKeyspace);
+ options.targetKeyspace,
+ options.targetTable);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
StreamResultFuture future = null;
@@ -82,7 +83,6 @@ public class BulkLoader
{
future = loader.stream(options.ignores, indicator);
}
-
}
catch (Exception e)
{
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index d882e5a853..27d54a7414 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -66,6 +66,7 @@ public class LoaderOptions
public static final String ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS = "entire-sstable-inter-dc-throttle";
public static final String TOOL_NAME = "sstableloader";
public static final String TARGET_KEYSPACE = "target-keyspace";
+ public static final String TARGET_TABLE = "target-table";
/* client encryption options */
public static final String SSL_TRUSTSTORE = "truststore";
@@ -97,6 +98,7 @@ public class LoaderOptions
public final Set<InetSocketAddress> hosts;
public final Set<InetAddressAndPort> ignores;
public final String targetKeyspace;
+ public final String targetTable;
LoaderOptions(Builder builder)
{
@@ -120,6 +122,7 @@ public class LoaderOptions
hosts = builder.hosts;
ignores = builder.ignores;
targetKeyspace = builder.targetKeyspace;
+ targetTable = builder.targetTable;
}
static class Builder
@@ -147,6 +150,7 @@ public class LoaderOptions
Set<InetSocketAddress> hosts = new HashSet<>();
Set<InetAddressAndPort> ignores = new HashSet<>();
String targetKeyspace;
+ String targetTable;
Builder()
{
@@ -328,6 +332,18 @@ public class LoaderOptions
return this;
}
+ public Builder targetKeyspace(String keyspace)
+ {
+ this.targetKeyspace = keyspace;
+ return this;
+ }
+
+ public Builder targetTable(String table)
+ {
+ this.targetKeyspace = table;
+ return this;
+ }
+
public Builder parseArgs(String cmdArgs[])
{
CommandLineParser parser = new GnuParser();
@@ -566,10 +582,16 @@ public class LoaderOptions
{
targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE);
if (StringUtils.isBlank(targetKeyspace))
- {
errorMsg("Empty keyspace is not supported.", options);
- }
}
+
+ if (cmd.hasOption(TARGET_TABLE))
+ {
+ targetTable = cmd.getOptionValue(TARGET_TABLE);
+ if (StringUtils.isBlank(targetTable))
+ errorMsg("Empty table is not supported.", options);
+ }
+
return this;
}
catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -678,6 +700,7 @@ public class LoaderOptions
options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name");
+ options.addOption("tb", TARGET_TABLE, "target table name", "target table name");
return options;
}
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index c941a81db2..0af6d24a0a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.io.Files;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.io.util.File;
import org.junit.After;
import org.junit.Before;
@@ -40,7 +41,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamEvent;
@@ -93,16 +93,33 @@ public class SSTableLoaderTest
@After
public void cleanup()
{
- try {
- FileUtils.deleteRecursive(tmpdir);
- } catch (FSWriteError e) {
+ try
+ {
+ tmpdir.deleteRecursive();
+ }
+ catch (FSWriteError e)
+ {
/*
We force a GC here to force buffer deallocation, and then try deleting the directory again.
For more information, see: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4715154
If this is not the problem, the exception will be rethrown anyway.
*/
System.gc();
- FileUtils.deleteRecursive(tmpdir);
+ tmpdir.deleteRecursive();
+ }
+
+ try
+ {
+ for (String[] keyspaceTable : new String[][] { {KEYSPACE1, CF_STANDARD1},
+ {KEYSPACE1, CF_STANDARD2},
+ {KEYSPACE1, CF_BACKUPS},
+ {KEYSPACE2, CF_STANDARD1},
+ {KEYSPACE2, CF_STANDARD2}})
+ StorageService.instance.truncate(keyspaceTable[0], keyspaceTable[1]);
+ }
+ catch (Exception ex)
+ {
+ throw new RuntimeException("Unable to truncate table!", ex);
}
}
@@ -150,9 +167,11 @@ public class SSTableLoaderTest
assertEquals(1, partitions.size());
assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
assert metadata != null;
- assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
- .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
- .buffer());
+
+ Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+ assert row != null;
+
+ assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
// The stream future is signalled when the work is complete but before releasing references. Wait for release
// before cleanup (CASSANDRA-10118).
@@ -168,7 +187,7 @@ public class SSTableLoaderTest
.inDirectory(dataDir)
.forTable(String.format(schema, KEYSPACE1, CF_STANDARD2))
.using(String.format(query, KEYSPACE1, CF_STANDARD2))
- .withBufferSizeInMB(1)
+ .withBufferSizeInMiB(1)
.build();
int NB_PARTITIONS = 5000; // Enough to write >1MiB and get at least one completed sstable before we've closed the writer
@@ -209,10 +228,9 @@ public class SSTableLoaderTest
}
@Test
- public void testLoadingSSTableToDifferentKeyspace() throws Exception
+ public void testLoadingSSTableToDifferentKeyspaceAndTable() throws Exception
{
- File dataDir = new File(tmpdir.absolutePath() + File.pathSeparator() + KEYSPACE1 + File.pathSeparator() + CF_STANDARD1);
- assert dataDir.tryCreateDirectories();
+ File dataDir = dataDir(CF_STANDARD1);
TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1);
String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
@@ -230,25 +248,31 @@ public class SSTableLoaderTest
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
Util.flush(cfs); // wait for sstables to be on disk else we won't be able to stream them
- final CountDownLatch latch = new CountDownLatch(1);
- SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2);
- loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
+ for (String table : new String[] { CF_STANDARD2, null })
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2, table);
+ loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
- cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
- Util.flush(cfs);
+ String targetTable = table == null ? CF_STANDARD1 : table;
+ cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(targetTable);
+ Util.flush(cfs);
- List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
- assertEquals(1, partitions.size());
- assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
- assert metadata != null;
- assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
- .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
- .buffer());
+ assertEquals(1, partitions.size());
+ assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+ assert metadata != null;
- // The stream future is signalled when the work is complete but before releasing references. Wait for release
- // before cleanup (CASSANDRA-10118).
- latch.await();
+ Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+ assert row != null;
+
+ assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
+
+ // The stream future is signalled when the work is complete but before releasing references. Wait for release
+ // before cleanup (CASSANDRA-10118).
+ latch.await();
+ }
}
@Test
@@ -278,9 +302,11 @@ public class SSTableLoaderTest
assertEquals(1, partitions.size());
assertEquals("key", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
assert metadata != null;
- assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
- .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
- .buffer());
+
+ Row row = partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")));
+ assert row != null;
+
+ assertEquals(ByteBufferUtil.bytes("100"), row.getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))).buffer());
// The stream future is signalled when the work is complete but before releasing references. Wait for release
// before cleanup (CASSANDRA-10118).
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org