You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2022/06/27 17:49:32 UTC
[cassandra-sidecar] branch trunk updated: Optimize file path builder and have separate handler for streaming file
This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24a08f2 Optimize file path builder and have separate handler for streaming file
24a08f2 is described below
commit 24a08f22707901f7641e48f0c26e54b05c0e03c3
Author: Saranya Krishnakumar <sa...@apple.com>
AuthorDate: Mon Jun 27 10:47:26 2022 -0700
Optimize file path builder and have separate handler for streaming file
patch by Francisco Guerrero, Saranya Krishnakumar; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-37
---
.circleci/config.yml | 12 +-
common/build.gradle | 1 +
.../sidecar/common/data/QualifiedTableName.java | 48 ++
.../sidecar/common/data/SSTableComponent.java | 45 ++
.../common/data/StreamSSTableComponentRequest.java | 47 ++
.../sidecar/common/utils/ValidationUtils.java | 85 +++
.../sidecar/common/ValidationUtilsTest.java | 167 +++++
gradle.properties | 2 +-
.../org/apache/cassandra/sidecar/MainModule.java | 28 +-
.../sidecar/cluster/instance/InstanceMetadata.java | 6 -
.../cluster/instance/InstanceMetadataImpl.java | 9 -
.../cassandra/sidecar/models/ComponentInfo.java | 40 --
.../cassandra/sidecar/models/HttpResponse.java | 55 +-
.../sidecar/routes/FileStreamHandler.java | 95 +++
.../sidecar/routes/StreamSSTableComponent.java | 131 ----
.../routes/StreamSSTableComponentHandler.java | 99 +++
.../sidecar/snapshots/SnapshotPathBuilder.java | 279 +++++++++
.../sidecar/utils/CachedFilePathBuilder.java | 217 -------
.../cassandra/sidecar/utils/FilePathBuilder.java | 145 -----
.../cassandra/sidecar/utils/FileStreamer.java | 204 +++++--
.../sidecar/utils/InstanceMetadataFetcher.java | 14 -
.../cassandra/sidecar/FilePathBuilderTest.java | 126 ----
.../sidecar/LoggerHandlerInjectionTest.java | 7 +-
.../sidecar/StreamSSTableComponentTest.java | 76 +--
.../org/apache/cassandra/sidecar/TestModule.java | 11 +-
.../org/apache/cassandra/sidecar/ThrottleTest.java | 6 +-
.../snapshots/AbstractSnapshotPathBuilderTest.java | 672 +++++++++++++++++++++
.../sidecar/snapshots/SnapshotPathBuilderTest.java | 16 +
28 files changed, 1827 insertions(+), 816 deletions(-)
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 51a8555..f5350d1 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -8,7 +8,7 @@ version: 2.1
aliases:
base_job: &base_job
machine:
- image: ubuntu-1604:202007-01
+ image: ubuntu-2004:202010-01
working_directory: ~/repo
environment:
TERM: dumb
@@ -18,6 +18,7 @@ aliases:
working_directory: ~/repo
environment:
TERM: dumb
+ TZ: "America/Los_Angeles"
# we might modify this in the future to accept a parameter for the java package to install
commands:
@@ -71,7 +72,7 @@ jobs:
- checkout
- install_common
- install_kube
-
+
- install_java:
version: adoptopenjdk-8-hotspot
- run: sudo update-java-alternatives -s adoptopenjdk-8-hotspot-amd64 && java -version
@@ -141,9 +142,12 @@ jobs:
rpm_build_install:
<<: *centos
steps:
+ - run: sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
+ - run: sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
+ - run: dnf -qy distro-sync
+ - run: dnf -qy install java-11-openjdk git
- checkout
- - run: yum install -y java-11-openjdk-devel # the image uses root by default, no need for sudo
- - run: JAVA_HOME=/usr/lib/jvm/java-11-openjdk ./gradlew -i buildRpm
+ - run: JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.13.0.8-4.el8_5.x86_64 ${PWD}/gradlew -i buildRpm
- run: yum install -y ./build/distributions/cassandra-sidecar*.rpm
- run: test -f /opt/cassandra-sidecar/bin/cassandra-sidecar
diff --git a/common/build.gradle b/common/build.gradle
index 653f9d2..10b24ee 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -22,6 +22,7 @@ test {
}
dependencies {
+ compile "io.vertx:vertx-web:${project.vertxVersion}"
compile 'org.slf4j:slf4j-api:1.7.25'
compile 'ch.qos.logback:logback-core:1.2.3'
compile 'ch.qos.logback:logback-classic:1.2.3'
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java
new file mode 100644
index 0000000..bb2956f
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java
@@ -0,0 +1,48 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Contains the keyspace and table name in Cassandra
+ */
+public class QualifiedTableName
+{
+ private final String keyspace;
+ private final String tableName;
+
+ /**
+ * Constructs a qualified name with the given {@code keyspace} and {@code tableName}
+ *
+ * @param keyspace the keyspace in Cassandra
+ * @param tableName the table name in Cassandra
+ */
+ public QualifiedTableName(String keyspace, String tableName)
+ {
+ this.keyspace = ValidationUtils.validateKeyspaceName(keyspace);
+ this.tableName = ValidationUtils.validateTableName(tableName);
+ }
+
+ /**
+ * @return the keyspace in Cassandra
+ */
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+
+ /**
+ * @return the table name in Cassandra
+ */
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String toString()
+ {
+ return keyspace + "." + tableName;
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
new file mode 100644
index 0000000..ef9e701
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
@@ -0,0 +1,45 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Represents an SSTable component that includes a keyspace, table name and component name
+ */
+public class SSTableComponent extends QualifiedTableName
+{
+ private final String componentName;
+
+ /**
+ * Constructor for the holder class
+ *
+ * @param keyspace the keyspace in Cassandra
+ * @param tableName the table name in Cassandra
+ * @param componentName the name of the SSTable component
+ */
+ public SSTableComponent(String keyspace, String tableName, String componentName)
+ {
+ super(keyspace, tableName);
+ this.componentName = ValidationUtils.validateComponentName(componentName);
+ }
+
+ /**
+ * @return the name of the SSTable component
+ */
+ public String getComponentName()
+ {
+ return componentName;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString()
+ {
+ return "SSTableComponent{" +
+ "keyspace='" + getKeyspace() + '\'' +
+ ", tableName='" + getTableName() + '\'' +
+ ", componentName='" + componentName + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java
new file mode 100644
index 0000000..c6d75fa
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java
@@ -0,0 +1,47 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Holder class for the {@code org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler}
+ * request parameters
+ */
+public class StreamSSTableComponentRequest extends SSTableComponent
+{
+ private final String snapshotName;
+
+ /**
+ * Constructor for the holder class
+ *
+ * @param keyspace the keyspace in Cassandra
+ * @param tableName the table name in Cassandra
+ * @param snapshotName the name of the snapshot
+ * @param componentName the name of the SSTable component
+ */
+ public StreamSSTableComponentRequest(String keyspace, String tableName, String snapshotName, String componentName)
+ {
+ super(keyspace, tableName, componentName);
+ this.snapshotName = ValidationUtils.validateSnapshotName(snapshotName);
+ }
+
+ /**
+ * @return the name of the snapshot
+ */
+ public String getSnapshotName()
+ {
+ return snapshotName;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String toString()
+ {
+ return "StreamSSTableComponentRequest{" +
+ "keyspace='" + getKeyspace() + '\'' +
+ ", tableName='" + getTableName() + '\'' +
+ ", snapshot='" + snapshotName + '\'' +
+ ", componentName='" + getComponentName() + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java b/common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java
new file mode 100644
index 0000000..345f38c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java
@@ -0,0 +1,85 @@
+package org.apache.cassandra.sidecar.common.utils;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Miscellaneous methods used for validation.
+ */
+public class ValidationUtils
+{
+ private static final Set<String> FORBIDDEN_DIRS = new HashSet<>(Arrays.asList("system_schema",
+ "system_traces",
+ "system_distributed",
+ "system",
+ "system_auth",
+ "system_views",
+ "system_virtual_schema"));
+ private static final String CHARS_ALLOWED_PATTERN = "[a-zA-Z0-9_-]+";
+ private static final Pattern PATTERN_WORD_CHARS = Pattern.compile(CHARS_ALLOWED_PATTERN);
+ private static final String REGEX_COMPONENT = CHARS_ALLOWED_PATTERN + "(.db|.cql|.json|.crc32|TOC.txt)";
+ private static final String REGEX_DB_TOC_COMPONENT = CHARS_ALLOWED_PATTERN + "(.db|TOC.txt)";
+
+ public static String validateKeyspaceName(final String keyspace)
+ {
+ Objects.requireNonNull(keyspace, "keyspace must not be null");
+ validatePattern(keyspace, "keyspace");
+ if (FORBIDDEN_DIRS.contains(keyspace))
+ throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "Forbidden keyspace: " + keyspace);
+ return keyspace;
+ }
+
+ public static String validateTableName(final String tableName)
+ {
+ Objects.requireNonNull(tableName, "tableName must not be null");
+ validatePattern(tableName, "table name");
+ return tableName;
+ }
+
+ public static String validateSnapshotName(final String snapshotName)
+ {
+ Objects.requireNonNull(snapshotName, "snapshotName must not be null");
+ // most UNIX systems only disallow file separator and null characters for directory names
+ if (snapshotName.contains(File.separator) || snapshotName.contains("\0"))
+ throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+ "Invalid characters in snapshot name: " + snapshotName);
+ return snapshotName;
+ }
+
+ public static String validateComponentName(String componentName)
+ {
+ return validateComponentNameByRegex(componentName, REGEX_COMPONENT);
+ }
+
+ public static String validateDbOrTOCComponentName(String componentName)
+ {
+ return validateComponentNameByRegex(componentName, REGEX_DB_TOC_COMPONENT);
+ }
+
+ @NotNull
+ private static String validateComponentNameByRegex(String componentName, String regex)
+ {
+ Objects.requireNonNull(componentName, "componentName must not be null");
+ if (!componentName.matches(regex))
+ throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+ "Invalid component name: " + componentName);
+ return componentName;
+ }
+
+ private static void validatePattern(String input, String name)
+ {
+ final Matcher matcher = PATTERN_WORD_CHARS.matcher(input);
+ if (!matcher.matches())
+ throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+ "Invalid characters in " + name + ": " + input);
+ }
+}
diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java
new file mode 100644
index 0000000..c262f86
--- /dev/null
+++ b/common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java
@@ -0,0 +1,167 @@
+package org.apache.cassandra.sidecar.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test validation methods.
+ */
+public class ValidationUtilsTest
+{
+
+ private void testCommon_invalidCharacters(String testName)
+ {
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateTableName(testName);
+ });
+ assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+ assertEquals("Invalid characters in table name: " + testName, httpEx.getPayload());
+ }
+
+ @Test
+ public void testValidateCharacters_validParams_expectNoException()
+ {
+ ValidationUtils.validateTableName("test_table_name");
+ ValidationUtils.validateTableName("test-table-name");
+ ValidationUtils.validateTableName("testTableName");
+ }
+
+ @Test
+ public void testValidateCharacters_paramWithColon_expectException()
+ {
+ testCommon_invalidCharacters("test:table_name");
+ }
+
+ @Test
+ public void testValidateCharacters_paramWithDollar_expectException()
+ {
+ testCommon_invalidCharacters("test-table$name");
+ }
+
+ @Test
+ public void testValidateCharacters_paramsWithSlash_expectException()
+ {
+ testCommon_invalidCharacters("testTable/Name");
+ }
+
+
+ @Test
+ public void testValidateKeyspaceName_validKeyspaceNames_expectNoException()
+ {
+ ValidationUtils.validateKeyspaceName("system-views");
+ ValidationUtils.validateKeyspaceName("SystemViews");
+ ValidationUtils.validateKeyspaceName("system_views_test");
+ }
+
+ @Test
+ public void testValidateKeyspaceName_forbiddenKeyspaceName_expectException()
+ {
+ String testKS = "system_views";
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateKeyspaceName(testKS);
+ });
+ assertEquals(HttpResponseStatus.FORBIDDEN.code(), httpEx.getStatusCode());
+ assertEquals("Forbidden keyspace: " + testKS, httpEx.getPayload());
+ }
+
+ @Test
+ public void testValidateKeyspaceName_keyspaceNameWithSpace_expectException()
+ {
+ String testKS = "test keyspace";
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateKeyspaceName(testKS);
+ });
+ assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+ assertEquals("Invalid characters in keyspace: " + testKS, httpEx.getPayload());
+ }
+
+
+ @Test
+ public void testValidateFileName_validFileNames_expectNoException()
+ {
+ ValidationUtils.validateComponentName("test-file-name.db");
+ ValidationUtils.validateComponentName("test_file_name.json");
+ ValidationUtils.validateComponentName("testFileName.cql");
+ ValidationUtils.validateComponentName("t_TOC.txt");
+ ValidationUtils.validateComponentName("crcfile.crc32");
+ }
+
+ private void testCommon_testInvalidFileName(String testFileName)
+ {
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateComponentName(testFileName);
+ });
+ assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+ assertEquals("Invalid component name: " + testFileName, httpEx.getPayload());
+ }
+
+ @Test
+ public void testValidateFileName_withoutExtension_expectException()
+ {
+ testCommon_testInvalidFileName("test-file-name");
+ }
+
+ @Test
+ public void testValidateFileName_incorrectExtension_expectException()
+ {
+ testCommon_testInvalidFileName("test-file-name.db1");
+ }
+
+ @Test
+ public void testValidateFileName_incorrectCrcExtension_expectException()
+ {
+ testCommon_testInvalidFileName("crcfile.crc64");
+ }
+
+ @Test
+ public void testValidateFileName_withoutFileName_expectException()
+ {
+ testCommon_testInvalidFileName("TOC.txt");
+ }
+
+
+ @Test
+ public void testValidateSnapshotName_validSnapshotNames_expectNoException()
+ {
+ ValidationUtils.validateSnapshotName("valid-snapshot-name");
+ ValidationUtils.validateSnapshotName("valid\\snapshot\\name"); // Is this really valid ??
+ ValidationUtils.validateSnapshotName("valid:snapshot:name");
+ ValidationUtils.validateSnapshotName("valid$snapshot$name");
+ ValidationUtils.validateSnapshotName("valid snapshot name");
+ }
+
+ @Test
+ public void testValidateSnapshotName_snapshotNameWithSlash_expectException()
+ {
+ String testSnapName = "valid" + '/' + "snapshotname";
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateSnapshotName(testSnapName);
+ });
+ assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+ assertEquals("Invalid characters in snapshot name: " + testSnapName, httpEx.getPayload());
+ }
+
+ @Test
+ public void testValidateSnapshotName_snapshotNameWithNullChar_expectException()
+ {
+ String testSnapName = "valid" + '\0' + "snapshotname";
+ HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+ {
+ ValidationUtils.validateSnapshotName(testSnapName);
+ });
+ assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+ assertEquals("Invalid characters in snapshot name: " + testSnapName, httpEx.getPayload());
+ }
+
+}
diff --git a/gradle.properties b/gradle.properties
index f004357..7a32a30 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,5 +1,5 @@
version=1.0-SNAPSHOT
junitVersion=5.4.2
kubernetesClientVersion=9.0.0
-cassandra40Version=4.0.3
+cassandra40Version=4.0.4
vertxVersion=4.2.1
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index 6925eb8..2c914d5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -41,6 +41,7 @@ import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
+import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.JksOptions;
@@ -57,8 +58,9 @@ import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
import org.apache.cassandra.sidecar.common.CQLSession;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.routes.CassandraHealthService;
+import org.apache.cassandra.sidecar.routes.FileStreamHandler;
import org.apache.cassandra.sidecar.routes.HealthService;
-import org.apache.cassandra.sidecar.routes.StreamSSTableComponent;
+import org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler;
import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
import org.apache.cassandra.sidecar.utils.YAMLKeyConstants;
import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
@@ -71,6 +73,7 @@ import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;
public class MainModule extends AbstractModule
{
private static final Logger logger = LoggerFactory.getLogger(MainModule.class);
+ private static final String V1_API_VERSION = "/api/v1";
@Provides
@Singleton
@@ -113,7 +116,6 @@ public class MainModule extends AbstractModule
@Singleton
private VertxRequestHandler configureServices(Vertx vertx,
HealthService healthService,
- StreamSSTableComponent ssTableComponent,
CassandraHealthService cassandraHealthService)
{
VertxResteasyDeployment deployment = new VertxResteasyDeployment();
@@ -122,7 +124,6 @@ public class MainModule extends AbstractModule
r.addPerInstanceResource(SwaggerOpenApiResource.class);
r.addSingletonResource(healthService);
- r.addSingletonResource(ssTableComponent);
r.addSingletonResource(cassandraHealthService);
return new VertxRequestHandler(vertx, deployment);
@@ -130,7 +131,11 @@ public class MainModule extends AbstractModule
@Provides
@Singleton
- public Router vertxRouter(Vertx vertx, LoggerHandler loggerHandler, ErrorHandler errorHandler)
+ public Router vertxRouter(Vertx vertx,
+ StreamSSTableComponentHandler streamSSTableComponentHandler,
+ FileStreamHandler fileStreamHandler,
+ LoggerHandler loggerHandler,
+ ErrorHandler errorHandler)
{
Router router = Router.router(vertx);
router.route()
@@ -144,6 +149,21 @@ public class MainModule extends AbstractModule
// Docs index.html page
StaticHandler docs = StaticHandler.create("docs");
router.route().path("/docs/*").handler(docs);
+
+ // add custom routers
+ final String componentRoute = "/keyspace/:keyspace/table/:table/snapshots/:snapshot/component/:component";
+ final String defaultStreamRoute = V1_API_VERSION + componentRoute;
+ final String instanceSpecificStreamRoute = V1_API_VERSION + "/instance/:instanceId" + componentRoute;
+ router.route().method(HttpMethod.GET)
+ .path(defaultStreamRoute)
+ .handler(streamSSTableComponentHandler::handleAllRequests)
+ .handler(fileStreamHandler);
+
+ router.route().method(HttpMethod.GET)
+ .path(instanceSpecificStreamRoute)
+ .handler(streamSSTableComponentHandler::handlePerInstanceRequests)
+ .handler(fileStreamHandler);
+
return router;
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
index 9081cb9..2f53175 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
@@ -4,7 +4,6 @@ import java.util.List;
import org.apache.cassandra.sidecar.common.CQLSession;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
/**
* Metadata of an instance
@@ -40,9 +39,4 @@ public interface InstanceMetadata
* Delegate specific for the instance.
*/
CassandraAdapterDelegate delegate();
-
- /**
- * Maintain one path builder for one instance.
- */
- FilePathBuilder pathBuilder();
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 987fdc2..8f3b566 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -5,8 +5,6 @@ import java.util.List;
import org.apache.cassandra.sidecar.common.CQLSession;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
/**
* Local implementation of InstanceMetadata.
@@ -19,7 +17,6 @@ public class InstanceMetadataImpl implements InstanceMetadata
private final List<String> dataDirs;
private final CQLSession session;
private final CassandraAdapterDelegate delegate;
- private final FilePathBuilder pathBuilder;
public InstanceMetadataImpl(int id, String host, int port, List<String> dataDirs, CQLSession session,
CassandraVersionProvider versionProvider, int healthCheckFrequencyMillis)
@@ -28,7 +25,6 @@ public class InstanceMetadataImpl implements InstanceMetadata
this.host = host;
this.port = port;
this.dataDirs = dataDirs;
- this.pathBuilder = new CachedFilePathBuilder(dataDirs);
this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
this.delegate = new CassandraAdapterDelegate(versionProvider, session, healthCheckFrequencyMillis);
@@ -63,9 +59,4 @@ public class InstanceMetadataImpl implements InstanceMetadata
{
return delegate;
}
-
- public FilePathBuilder pathBuilder()
- {
- return pathBuilder;
- }
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java b/src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java
deleted file mode 100644
index 0588ce7..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.cassandra.sidecar.models;
-
-/**
- * Stores information needed to identify a SStable component.
- */
-public class ComponentInfo
-{
- private final String keyspace;
- private final String table;
- private final String snapshot;
- private final String component;
-
- public ComponentInfo(String keyspace, String table, String snapshot, String component)
- {
- this.keyspace = keyspace;
- this.table = table;
- this.snapshot = snapshot;
- this.component = component;
- }
-
- public String getKeyspace()
- {
- return keyspace;
- }
-
- public String getTable()
- {
- return table;
- }
-
- public String getSnapshot()
- {
- return snapshot;
- }
-
- public String getComponent()
- {
- return component;
- }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
index 708221e..ffeebf5 100644
--- a/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
+++ b/src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
@@ -1,24 +1,31 @@
package org.apache.cassandra.sidecar.models;
-import java.io.File;
-
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.net.SocketAddress;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
/**
* Wrapper around HttpServerResponse
*/
public class HttpResponse
{
+ private final String host;
+ private final HttpServerRequest request;
private final HttpServerResponse response;
- public HttpResponse(HttpServerResponse response)
+ public HttpResponse(HttpServerRequest request, HttpServerResponse response)
{
+ this.request = request;
this.response = response;
+ this.host = extractHostAddressWithoutPort(request.host());
}
public void setRetryAfterHeader(long microsToWait)
@@ -63,26 +70,42 @@ public class HttpResponse
response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).setStatusMessage(msg).end();
}
- public void sendFile(File file)
+ /**
+ * Send a range in a file asynchronously
+ *
+ * @param fileName file to send
+ * @param fileLength the size of the file to send
+ * @param range range to send
+ * @return a future completed with the body result
+ */
+ public Future<Void> sendFile(String fileName, long fileLength, Range range)
{
- response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
- .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(file.length()))
- .sendFile(file.getAbsolutePath());
- }
+ // notify client we support range requests
+ response.putHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
- public void sendFile(File file, Range range)
- {
- if (range.length() != file.length())
+ if (range.length() != fileLength)
{
setPartialContentStatus(range);
}
- response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
- .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(range.length()))
- .sendFile(file.getAbsolutePath(), range.start(), range.length());
+
+ return response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
+ .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(range.length()))
+ .sendFile(fileName, range.start(), range.length());
+ }
+
+ /**
+ * @return the remote address for this connection, possibly {@code null} (e.g a server bound on a domain socket).
+ */
+ public SocketAddress remoteAddress()
+ {
+ return request.remoteAddress();
}
- public void setForbiddenStatus(String msg)
+ /**
+ * @return the request host without the port
+ */
+ public String host()
{
- response.setStatusCode(HttpResponseStatus.FORBIDDEN.code()).setStatusMessage(msg).end();
+ return host;
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java
new file mode 100644
index 0000000..b0c02b8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java
@@ -0,0 +1,95 @@
+package org.apache.cassandra.sidecar.routes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.file.FileProps;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
+
+/**
+ * Handler for sending out files.
+ */
+public class FileStreamHandler implements Handler<RoutingContext>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamHandler.class);
+ public static final String FILE_PATH_CONTEXT_KEY = "fileToTransfer";
+
+ private final FileStreamer fileStreamer;
+
+ @Inject
+ public FileStreamHandler(final FileStreamer fileStreamer)
+ {
+ this.fileStreamer = fileStreamer;
+ }
+
+ @Override
+ public void handle(RoutingContext context)
+ {
+ final String localFile = context.get(FILE_PATH_CONTEXT_KEY);
+ final HttpServerRequest request = context.request();
+ final String host = extractHostAddressWithoutPort(request.host());
+
+ LOGGER.debug("FileStreamHandler handle file transfer '{}' for client: {}. Instance: {}", localFile,
+ request.remoteAddress(), host);
+
+ FileSystem fs = context.vertx().fileSystem();
+ fs.exists(localFile)
+ .compose(exists -> ensureValidFile(fs, localFile, exists))
+ .compose(fileProps -> fileStreamer.stream(new HttpResponse(request, context.response()), localFile,
+ fileProps.size(), request.getHeader(HttpHeaderNames.RANGE)))
+ .onSuccess(v -> LOGGER.debug("Completed streaming file '{}'", localFile))
+ .onFailure(context::fail);
+ }
+
+ /**
+ * Ensures that the file exists and is a non-empty regular file
+ *
+ * @param fs The underlying filesystem
+ * @param localFile The path the file in the filesystem
+ * @param exists Whether the file exists or not
+ * @return a succeeded future with the {@link FileProps}, or a failed future if the file does not exist;
+ * is not a regular file; or if the file is empty
+ */
+ private Future<FileProps> ensureValidFile(FileSystem fs, String localFile, Boolean exists)
+ {
+ if (!exists)
+ {
+ LOGGER.error("The requested file '{}' does not exist", localFile);
+ return Future.failedFuture(new HttpException(NOT_FOUND.code(), "The requested file does not exist"));
+ }
+
+ return fs.props(localFile)
+ .compose(fileProps ->
+ {
+ if (fileProps == null || !fileProps.isRegularFile())
+ {
+ // File is not a regular file
+ LOGGER.error("The requested file '{}' does not exist", localFile);
+ return Future.failedFuture(new HttpException(NOT_FOUND.code(),
+ "The requested file does not exist"));
+ }
+
+ if (fileProps.size() <= 0)
+ {
+ LOGGER.error("The requested file '{}' has 0 size", localFile);
+ return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code(),
+ "The requested file is empty"));
+ }
+
+ return Future.succeededFuture(fileProps);
+ });
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java
deleted file mode 100644
index dc4596e..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package org.apache.cassandra.sidecar.routes;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-import javax.ws.rs.GET;
-import javax.ws.rs.HeaderParam;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.http.HttpServerResponse;
-import org.apache.cassandra.sidecar.models.ComponentInfo;
-import org.apache.cassandra.sidecar.models.HttpResponse;
-import org.apache.cassandra.sidecar.models.Range;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FileStreamer;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-
-import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
-
-/**
- * Handler for serving SSTable components from snapshot folders
- */
-@Singleton
-@javax.ws.rs.Path("/api")
-public class StreamSSTableComponent
-{
- private static final Pattern REGEX_DIR = Pattern.compile("[a-zA-Z0-9_-]+");
- private static final Pattern REGEX_COMPONENT = Pattern.compile("[a-zA-Z0-9_-]+(.db|.cql|.json|.crc32|TOC.txt)");
- private static final Set<String> FORBIDDEN_DIRS = new HashSet<>(
- Arrays.asList("system_schema", "system_traces", "system_distributed", "system", "system_auth"));
-
- private final InstanceMetadataFetcher metadataFetcher;
- private final FileStreamer fileStreamer;
-
- @Inject
- public StreamSSTableComponent(final InstanceMetadataFetcher metadataFetcher, final FileStreamer fileStreamer)
- {
- this.metadataFetcher = metadataFetcher;
- this.fileStreamer = fileStreamer;
- }
-
- @GET
- @javax.ws.rs.Path("/v1/stream/keyspace/{keyspace}/table/{table}/snapshot/{snapshot}/component/{component}")
- public void streamFromFirstInstance(@PathParam("keyspace") String keyspace, @PathParam("table") String table,
- @PathParam("snapshot") String snapshot,
- @PathParam("component") String component, @HeaderParam("Range") String range,
- @Context HttpServerResponse resp, @Context HttpServerRequest req)
- {
- final String host = extractHostAddressWithoutPort(req.host());
- stream(new ComponentInfo(keyspace, table, snapshot, component), range, null, host, resp);
- }
-
- @GET
- @javax.ws.rs.Path
- ("/v1/stream/instance/{instanceId}/keyspace/{keyspace}/table/{table}/snapshot/{snapshot}/component/{component}")
- public void streamFromSpecificInstance(@PathParam("keyspace") String keyspace, @PathParam("table") String table,
- @PathParam("snapshot") String snapshot,
- @PathParam("component") String component,
- @PathParam("instanceId") Integer instanceId,
- @HeaderParam("Range") String range, @Context HttpServerResponse resp)
- {
- stream(new ComponentInfo(keyspace, table, snapshot, component), range, instanceId, null, resp);
- }
-
- private void stream(ComponentInfo componentInfo, String range, Integer instanceId,
- String host, HttpServerResponse resp)
- {
- final HttpResponse response = new HttpResponse(resp);
- if (FORBIDDEN_DIRS.contains(componentInfo.getKeyspace()))
- {
- response.setForbiddenStatus(componentInfo.getKeyspace() + " keyspace is forbidden");
- return;
- }
- if (!arePathParamsValid(componentInfo))
- {
- response.setBadRequestStatus("Invalid path params found");
- return;
- }
-
- final Path path;
- final FilePathBuilder pathBuilder = instanceId == null
- ? metadataFetcher.getPathBuilder(host)
- : metadataFetcher.getPathBuilder(instanceId);
- try
- {
- path = pathBuilder.build(componentInfo.getKeyspace(), componentInfo.getTable(),
- componentInfo.getSnapshot(), componentInfo.getComponent());
- }
- catch (FileNotFoundException e)
- {
- response.setNotFoundStatus(e.getMessage());
- return;
- }
- final File file = path.toFile();
- final Range r;
- try
- {
- r = parseRangeHeader(range, file.length());
- }
- catch (Exception e)
- {
- response.setRangeNotSatisfiable(e.getMessage());
- return;
- }
- fileStreamer.stream(response, file, r);
- }
-
- private boolean arePathParamsValid(ComponentInfo componentInfo)
- {
- return REGEX_DIR.matcher(componentInfo.getKeyspace()).matches()
- && REGEX_DIR.matcher(componentInfo.getTable()).matches()
- && REGEX_DIR.matcher(componentInfo.getSnapshot()).matches()
- && REGEX_COMPONENT.matcher(componentInfo.getComponent()).matches();
- }
-
- private Range parseRangeHeader(String rangeHeader, long fileSize)
- {
- final Range fileRange = new Range(0, fileSize - 1, fileSize);
- // sidecar does not support multiple ranges as of now
- final Range headerRange = Range.parseHeader(rangeHeader, fileSize);
- return fileRange.intersect(headerRange);
- }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
new file mode 100644
index 0000000..517baac
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
@@ -0,0 +1,99 @@
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.FileNotFoundException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder;
+
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
+
+/**
+ * This handler validates that the component exists in the cluster and sets up the context
+ * for the {@link FileStreamHandler} to stream the component back to the client
+ */
+@Singleton
+public class StreamSSTableComponentHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamSSTableComponentHandler.class);
+
+ private final SnapshotPathBuilder snapshotPathBuilder;
+ private final InstancesConfig instancesConfig;
+
+ @Inject
+ public StreamSSTableComponentHandler(SnapshotPathBuilder snapshotPathBuilder, InstancesConfig instancesConfig)
+ {
+ this.snapshotPathBuilder = snapshotPathBuilder;
+ this.instancesConfig = instancesConfig;
+ }
+
+ public void handleAllRequests(RoutingContext context)
+ {
+ final HttpServerRequest request = context.request();
+ final String host = extractHostAddressWithoutPort(request.host());
+ streamFilesForHost(host, context);
+ }
+
+ public void handlePerInstanceRequests(RoutingContext context)
+ {
+ final String instanceIdParam = context.request().getParam("InstanceId");
+ if (instanceIdParam == null)
+ {
+ context.fail(new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+ "InstanceId path parameter must be provided"));
+ return;
+ }
+
+ final Integer instanceId = Integer.valueOf(instanceIdParam);
+ final String host = instancesConfig.instanceFromId(instanceId).host();
+ streamFilesForHost(host, context);
+ }
+
+ public void streamFilesForHost(String host, RoutingContext context)
+ {
+ final SocketAddress remoteAddress = context.request().remoteAddress();
+ final StreamSSTableComponentRequest requestParams = extractParamsOrThrow(context);
+ logger.info("StreamSSTableComponentHandler received request: {} from: {}. Instance: {}", requestParams,
+ remoteAddress, host);
+
+ snapshotPathBuilder.build(host, requestParams)
+ .onSuccess(path ->
+ {
+ logger.debug("StreamSSTableComponentHandler handled {} for client {}. Instance: {}", path,
+ remoteAddress, host);
+ context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path)
+ .next();
+ })
+ .onFailure(cause ->
+ {
+ if (cause instanceof FileNotFoundException)
+ {
+ context.fail(new HttpException(HttpResponseStatus.NOT_FOUND.code(), cause.getMessage()));
+ }
+ else
+ {
+ context.fail(new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+ "Invalid request for " + requestParams));
+ }
+ });
+ }
+
+ private StreamSSTableComponentRequest extractParamsOrThrow(final RoutingContext rc)
+ {
+ return new StreamSSTableComponentRequest(rc.pathParam("keyspace"),
+ rc.pathParam("table"),
+ rc.pathParam("snapshot"),
+ rc.pathParam("component")
+ );
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
new file mode 100644
index 0000000..8cb4a55
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
@@ -0,0 +1,279 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.file.FileProps;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * This class builds the snapshot path on a given host validating that it exists
+ */
+@Singleton
+public class SnapshotPathBuilder
+{
+ private static final Logger logger = LoggerFactory.getLogger(SnapshotPathBuilder.class);
+ private static final String DATA_SUB_DIR = "/data";
+ public static final String SNAPSHOTS_DIR_NAME = "snapshots";
+ protected final FileSystem fs;
+ protected final InstancesConfig instancesConfig;
+
+ /**
+ * Creates a new SnapshotPathBuilder for snapshots of an instance with the given {@code fs filesystem} and
+ * {@code instancesConfig Cassandra configuration}.
+ *
+ * @param fs the underlying filesystem
+ * @param instancesConfig the configuration for Cassandra
+ */
+ @Inject
+ public SnapshotPathBuilder(FileSystem fs, InstancesConfig instancesConfig)
+ {
+ this.fs = fs;
+ this.instancesConfig = instancesConfig;
+ }
+
+ /**
+ * Builds the path to the given component given the {@code keyspace}, {@code table}, and {@code snapshot}
+ * inside the specified {@code host}. When a table has been dropped and recreated, the code searches for
+ * the latest modified directory for that table.
+ *
+ * @param host the name of the host
+ * @param request the request to stream the SSTable component
+ * @return the absolute path of the component
+ */
+ public Future<String> build(String host, StreamSSTableComponentRequest request)
+ {
+ validate(request);
+ // Search for the file
+ return getDataDirectories(host)
+ .compose(dataDirs -> findKeyspaceDirectory(dataDirs, request.getKeyspace()))
+ .compose(keyspaceDirectory -> findTableDirectory(keyspaceDirectory, request.getTableName()))
+ .compose(tableDirectory -> findComponent(tableDirectory, request.getSnapshotName(),
+ request.getComponentName()));
+ }
+
+ /**
+ * Validates that the component name is either {@code *.db} or a {@code *-TOC.txt}
+ * which are the only required components to read SSTables.
+ *
+ * @param request the request to stream the SSTable component
+ */
+ protected void validate(StreamSSTableComponentRequest request)
+ {
+ // Only allow .db and TOC.txt components here
+ ValidationUtils.validateDbOrTOCComponentName(request.getComponentName());
+ }
+
+ /**
+ * @param host the host
+ * @return the data directories for the given {@code host}
+ */
+ protected Future<List<String>> getDataDirectories(String host)
+ {
+ List<String> dataDirs = instancesConfig.instanceFromHost(host).dataDirs();
+ if (dataDirs == null || dataDirs.isEmpty())
+ {
+ logger.error("No data directories are available for host '{}'", host);
+ String errMsg = String.format("No data directories are available for host '%s'", host);
+ return Future.failedFuture(new FileNotFoundException(errMsg));
+ }
+ return Future.succeededFuture(dataDirs);
+ }
+
+ /**
+ * Searches in the list of {@code daraDirs} for the given {@code keyspace} and returns the directory
+ * of the keyspace when it is found, or failure when the {@code keyspace} directory does not exist. If
+ * one of the data directories does not exist, a failure will be reported.
+ *
+ * @param dataDirs the list of data directories for a given host
+ * @param keyspace the name of the Cassandra keyspace
+ * @return the directory of the keyspace when it is found, or failure if not found
+ */
+ protected Future<String> findKeyspaceDirectory(List<String> dataDirs, String keyspace)
+ {
+ List<Future<String>> candidates = buildPotentialKeyspaceDirectoryList(dataDirs, keyspace);
+ // We want to find the first valid directory in this case. If a future fails, we
+ // recover by checking each candidate for existence.
+ // Whenever the first successful future returns, we short-circuit the rest.
+ Future<String> root = candidates.get(0);
+ for (int i = 1; i < candidates.size(); i++)
+ {
+ Future<String> f = candidates.get(i);
+ root = root.recover(v -> f);
+ }
+ String errMsg = String.format("Keyspace '%s' does not exist", keyspace);
+ return root.recover(t -> Future.failedFuture(new FileNotFoundException(errMsg)));
+ }
+
+ /**
+ * Builds a list of potential directory lists for the keyspace
+ *
+ * @param dataDirs the list of directories
+ * @param keyspace the Cassandra keyspace
+ * @return a list of potential directories for the keyspace
+ */
+ private List<Future<String>> buildPotentialKeyspaceDirectoryList(List<String> dataDirs, String keyspace)
+ {
+ List<Future<String>> candidates = new ArrayList<>(dataDirs.size() * 2);
+ for (String baseDirectory : dataDirs)
+ {
+ String dir = StringUtils.removeEnd(baseDirectory, File.separator);
+ candidates.add(isValidDirectory(dir + File.separator + keyspace));
+ candidates.add(isValidDirectory(dir + DATA_SUB_DIR + File.separator + keyspace));
+ }
+ return candidates;
+ }
+
+ /**
+ * Finds the most recent directory for the given {@code tableName} in the {@code baseDirectory}. Cassandra
+ * appends the table UUID when a table is created. When a table is dropped and then recreated, a new directory
+ * with the new table UUID is created. For that reason we need to return the most recent directory for the
+ * given table name.
+ *
+ * @param baseDirectory the base directory where we search the table directory
+ * @param tableName the name of the table
+ * @return the most recent directory for the given {@code tableName} in the {@code baseDirectory}
+ */
+ protected Future<String> findTableDirectory(String baseDirectory, String tableName)
+ {
+ return fs.readDir(baseDirectory, tableName + "($|-.*)") // match exact table name or table-.*
+ .compose(list -> getLastModifiedTableDirectory(list, tableName));
+ }
+
+ /**
+ * Constructs the path to the component using the {@code baseDirectory}, {@code snapshotName}, and
+ * {@code componentName} and returns if it is a valid path to the component, or a failure otherwise.
+ *
+ * @param baseDirectory the base directory where we search the table directory
+ * @param snapshotName the name of the snapshot
+ * @param componentName the name of the component
+ * @return the path to the component if it's valid, a failure otherwise
+ */
+ protected Future<String> findComponent(String baseDirectory, String snapshotName, String componentName)
+ {
+ String componentFilename = StringUtils.removeEnd(baseDirectory, File.separator) +
+ File.separator + SNAPSHOTS_DIR_NAME + File.separator + snapshotName +
+ File.separator + componentName;
+
+ return isValidFilename(componentFilename)
+ .recover(t ->
+ {
+ logger.warn("Snapshot directory {} or component {} does not exist in {}", snapshotName,
+ componentName, componentFilename);
+ String errMsg = String.format("Component '%s' does not exist for snapshot '%s'",
+ componentName, snapshotName);
+ return Future.failedFuture(new FileNotFoundException(errMsg));
+ });
+ }
+
+ /**
+ * @param filename the path to the file
+ * @return a future of the {@code filename} if it exists and is a regular file, a failed future otherwise
+ */
+ protected Future<String> isValidFilename(String filename)
+ {
+ return isValidOfType(filename, FileProps::isRegularFile);
+ }
+
+ /**
+ * @param path the path to the directory
+ * @return a future of the {@code path} if it exists and is a directory, a failed future otherwise
+ */
+ protected Future<String> isValidDirectory(String path)
+ {
+ return isValidOfType(path, FileProps::isDirectory);
+ }
+
+ /**
+ * @param filename the path
+ * @param predicate a predicate that evaluates based on {@link FileProps}
+ * @return a future of the {@code filename} if it exists and {@code predicate} evaluates to true,
+ * a failed future otherwise
+ */
+ protected Future<String> isValidOfType(String filename, Predicate<FileProps> predicate)
+ {
+ return fs.exists(filename)
+ .compose(exists ->
+ {
+ if (!exists)
+ {
+ String errMsg = "File '" + filename + "' does not exist";
+ return Future.failedFuture(new FileNotFoundException(errMsg));
+ }
+ return fs.props(filename)
+ .compose(fileProps ->
+ {
+ if (fileProps == null || !predicate.test(fileProps))
+ {
+ String errMsg = "File '" + filename + "' does not exist";
+ return Future.failedFuture(new FileNotFoundException(errMsg));
+ }
+ return Future.succeededFuture(filename);
+ });
+ });
+ }
+
+ /**
+ * @param fileList a list of files
+ * @param tableName the name of the Cassandra table
+ * @return a future with the last modified directory from the list, or a failed future when there are no directories
+ */
+ protected Future<String> getLastModifiedTableDirectory(List<String> fileList, String tableName)
+ {
+ if (fileList.size() == 0)
+ {
+ String errMsg = String.format("Table '%s' does not exist", tableName);
+ return Future.failedFuture(new FileNotFoundException(errMsg));
+ }
+
+ //noinspection rawtypes
+ List<Future> futures = fileList.stream()
+ .map(fs::props)
+ .collect(Collectors.toList());
+
+ Promise<String> promise = Promise.promise();
+ CompositeFuture.all(futures)
+ .onFailure(promise::fail)
+ .onSuccess(ar ->
+ {
+ String directory = IntStream.range(0, fileList.size())
+ .mapToObj(i -> Pair.of(fileList.get(i),
+ ar.<FileProps>resultAt(i)))
+ .filter(pair -> pair.getRight().isDirectory())
+ .max(Comparator.comparingLong(pair -> pair.getRight()
+ .lastModifiedTime()))
+ .map(Pair::getLeft)
+ .orElse(null);
+
+ if (directory == null)
+ {
+ String errMsg = String.format("Table '%s' does not exist", tableName);
+ promise.fail(new FileNotFoundException(errMsg));
+ }
+ else
+ {
+ promise.complete(directory);
+ }
+ });
+ return promise.future();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java
deleted file mode 100644
index 6468f9b..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.FileNotFoundException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.KeyException;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * Path builder that caches intermediate paths
- */
-public class CachedFilePathBuilder extends FilePathBuilder
-{
- private static final Logger logger = LoggerFactory.getLogger(CachedFilePathBuilder.class);
- private final CacheLoader<Key, String> loader = new PathCacheLoader();
- private final LoadingCache<Key, String> sstableCache = getCacheBuilder();
- private final LoadingCache<Key, String> snapshotCache = getCacheBuilder();
- private final LoadingCache<Key, String> tableCache = getCacheBuilder();
- private final LoadingCache<Key, String> keyspaceCache = getCacheBuilder();
-
- private LoadingCache<Key, String> getCacheBuilder()
- {
- return CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(5, TimeUnit.MINUTES).build(loader);
- }
-
- @Inject
- public CachedFilePathBuilder(final Collection<String> dataDirs)
- {
- super(dataDirs);
- }
-
- public Path build(String keyspace, String table, String snapshot, String component) throws FileNotFoundException
- {
- try
- {
- return Paths.get(sstableCache.get(new Key.Builder().setKeyspace(keyspace).setTable(table)
- .setSnapshot(snapshot).setComponent(component).build()));
- }
- catch (Throwable t)
- {
- if (ExceptionUtils.getRootCause(t) instanceof FileNotFoundException)
- {
- throw (FileNotFoundException) ExceptionUtils.getRootCause(t);
- }
- else
- {
- logger.error("Unexpected error while building path ", t);
- throw new RuntimeException("Error loading value from path cache");
- }
- }
- }
-
- /**
- * Cache Loader for guava cache storing path to files
- */
- public class PathCacheLoader extends CacheLoader<Key, String>
- {
- @Override
- public String load(Key key) throws FileNotFoundException, KeyException, ExecutionException
- {
- switch (key.type())
- {
- case KEYSPACE_TABLE_SNAPSHOT_COMPONENT:
- return addSSTableComponentToPath(key.component(), snapshotCache.get(new Key.Builder()
- .setKeyspace(key.keyspace()).setTable(key.table()).setSnapshot(key.snapshot()).build()));
- case KEYSPACE_TABLE_SNAPSHOT:
- return addSnapshotToPath(key.snapshot(), tableCache.get(new Key.Builder()
- .setKeyspace(key.keyspace()).setTable(key.table()).build()));
- case KEYSPACE_TABLE:
- return addTableToPath(key.table(), keyspaceCache.get(new Key.Builder().setKeyspace(key.keyspace())
- .build()));
- case JUST_KEYSPACE:
- return addKeyspaceToPath(key.keyspace());
- default:
- throw new KeyException();
- }
- }
- }
-
- /**
- * Key to retrieve path information from cache
- */
- public static class Key
- {
- private final String keyspace;
- private final String table;
- private final String snapshot;
- private final String component;
- private final KeyType type;
-
- private Key(String keyspace, String table, String snapshot, String component, KeyType type)
- {
- this.keyspace = keyspace;
- this.table = table;
- this.snapshot = snapshot;
- this.component = component;
- this.type = type;
- }
-
- public String keyspace() throws KeyException
- {
- return Optional.ofNullable(keyspace).orElseThrow(KeyException::new);
- }
-
- public String table() throws KeyException
- {
- return Optional.ofNullable(table).orElseThrow(KeyException::new);
- }
-
- public String snapshot() throws KeyException
- {
- return Optional.ofNullable(snapshot).orElseThrow(KeyException::new);
- }
-
- public String component() throws KeyException
- {
- return Optional.ofNullable(component).orElseThrow(KeyException::new);
- }
-
- public KeyType type()
- {
- return type;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
- Key key = (Key) o;
- return type == key.type &&
- Objects.equals(keyspace, key.keyspace) &&
- Objects.equals(table, key.table) &&
- Objects.equals(snapshot, key.snapshot) &&
- Objects.equals(component, key.component);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(keyspace, table, snapshot, component, type);
- }
-
- /**
- * Builder class for Key
- */
- public static class Builder
- {
- private String keyspace;
- private String table;
- private String snapshot;
- private String component;
- private int length;
-
- public Builder setKeyspace(String keyspace)
- {
- length++;
- this.keyspace = keyspace;
- return this;
- }
-
- public Builder setTable(String table)
- {
- length++;
- this.table = table;
- return this;
- }
-
- public Builder setSnapshot(String snapshot)
- {
- length++;
- this.snapshot = snapshot;
- return this;
- }
-
- public Builder setComponent(String component)
- {
- length++;
- this.component = component;
- return this;
- }
-
- public CachedFilePathBuilder.Key build()
- {
- return new CachedFilePathBuilder.Key(keyspace, table, snapshot, component,
- KeyType.values()[length - 1]);
- }
- }
- }
-
- /**
- * Enum to hold types of keys created
- */
- public enum KeyType
- {
- JUST_KEYSPACE, KEYSPACE_TABLE, KEYSPACE_TABLE_SNAPSHOT, KEYSPACE_TABLE_SNAPSHOT_COMPONENT
- }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java
deleted file mode 100644
index 334f58e..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Builds file path after verifying it exists
- */
-public abstract class FilePathBuilder
-{
- private static final Logger logger = LoggerFactory.getLogger(FilePathBuilder.class);
- private static final String dataSubDir = "/data";
- private final Collection<String> dataDirs;
-
- public FilePathBuilder(@NotNull final Collection<String> dataDirs)
- {
- this.dataDirs = dataDirs;
- }
-
- public abstract Path build(String keyspace, String table, String snapshot, String component)
- throws FileNotFoundException;
-
- String addKeyspaceToPath(String keyspace) throws FileNotFoundException
- {
- for (String dir : dataDirs)
- {
- StringBuilder path = new StringBuilder(dir);
- if (addFileToPathIfPresent(path, keyspace, true))
- {
- return path.toString();
- }
-
- if (dir.endsWith(dataSubDir))
- {
- continue;
- }
-
- // check in "data" sub directory
- if (addFileToPathIfPresent(path.append(dataSubDir), keyspace, true))
- {
- return path.toString();
- }
- }
- throw new FileNotFoundException("Keyspace " + keyspace + " does not exist");
- }
-
- String addTableToPath(String table, String path) throws FileNotFoundException
- {
- final StringBuilder modifiedPath = new StringBuilder(path);
- if (addFileToPathIfPresent(modifiedPath, table, false))
- {
- return modifiedPath.toString();
- }
- throw new FileNotFoundException("Table " + table + " not found, path searched: " + path);
- }
-
- String addSnapshotToPath(String snapshot, String path) throws FileNotFoundException
- {
- final StringBuilder modifiedPath = new StringBuilder(path);
- if (addFileToPathIfPresent(modifiedPath.append("/snapshots"), snapshot, true))
- {
- return modifiedPath.toString();
- }
- throw new FileNotFoundException("Snapshot " + snapshot + " not found, path searched: " + path);
- }
-
- String addSSTableComponentToPath(String component, String path) throws FileNotFoundException
- {
- final StringBuilder modifiedPath = new StringBuilder(path);
- if (addFileToPathIfPresent(modifiedPath, component, true))
- {
- return modifiedPath.toString();
- }
- throw new FileNotFoundException("Component " + component + " not found, path searched: " + path);
- }
-
- private boolean addFileToPathIfPresent(StringBuilder path, String file, boolean checkEqual)
- throws FileNotFoundException
- {
- final Path fileDir = Paths.get(path.toString());
- if (!checkDirExists(fileDir))
- {
- throw new FileNotFoundException(fileDir + " directory empty or does not exist!");
- }
-
- try
- {
- Path finalPath = null;
- try (final DirectoryStream<Path> dirEntries = Files.newDirectoryStream(fileDir))
- {
- for (Path entry : dirEntries)
- {
- final Path filePath = entry.getFileName();
- if (filePath == null)
- {
- continue;
- }
- final String fileName = filePath.toString();
- if (fileName.equals(file) || (!checkEqual && fileName.startsWith(file + "-")))
- {
- if (finalPath == null
- || Files.getLastModifiedTime(entry).compareTo(Files.getLastModifiedTime(finalPath)) > 0)
- {
- finalPath = entry;
- }
- }
- }
- if (finalPath != null)
- {
- final Path finalFilePath = finalPath.getFileName();
- if (finalFilePath == null)
- {
- return false;
- }
- final String finalFileName = finalFilePath.toString();
- path.append('/').append(finalFileName);
- return true;
- }
- }
- }
- catch (IOException e)
- {
- logger.error("Error listing files in path {}, could not add file {} to path", path, file, e);
- throw new RuntimeException("Failed to list files in path " + path);
- }
- return false;
- }
-
- private boolean checkDirExists(final Path path)
- {
- final File file = new File(path.toString());
- return file.exists() && file.isDirectory();
- }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index b90f231..131320e 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -1,24 +1,27 @@
package org.apache.cassandra.sidecar.utils;
-import java.io.File;
import java.time.Duration;
import java.time.Instant;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import com.google.common.util.concurrent.SidecarRateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.exceptions.RangeException;
import org.apache.cassandra.sidecar.models.HttpResponse;
import org.apache.cassandra.sidecar.models.Range;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
+import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
/**
* General handler for serving files
@@ -27,87 +30,172 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
public class FileStreamer
{
private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamer.class);
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat("acquirePermit").setDaemon(true).build());
- private final Duration delay;
- private final Duration timeout;
+ private static final long DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND = Long.MAX_VALUE;
+ private final Vertx vertx;
+ private final Configuration config;
private final SidecarRateLimiter rateLimiter;
@Inject
- public FileStreamer(Configuration config, SidecarRateLimiter rateLimiter)
+ public FileStreamer(Vertx vertx, Configuration config, SidecarRateLimiter rateLimiter)
{
+ this.vertx = vertx;
+ this.config = config;
this.rateLimiter = rateLimiter;
- this.delay = Duration.ofSeconds(config.getThrottleDelayInSeconds());
- this.timeout = Duration.ofSeconds(config.getThrottleTimeoutInSeconds());
}
- public void stream(final HttpResponse resp, final File file)
+ /**
+ * Streams the {@code filename file} with length {@code fileLength} for the (optionally) requested
+ * {@code rangeHeader} using the provided {@code response}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param rangeHeader (optional) a string representing the requested range for the file
+ * @return a future with the result of the streaming
+ */
+ public Future<Void> stream(HttpResponse response, String filename, long fileLength, String rangeHeader)
{
- stream(resp, file, new Range(0, file.length() - 1, file.length()));
+ return parseRangeHeader(rangeHeader, fileLength)
+ .compose(range -> stream(response, filename, fileLength, range));
}
- public void stream(final HttpResponse resp, final File file, final Range range)
+ /**
+ * Streams the {@code filename file} with length {@code fileLength} for the requested
+ * {@code range} using the provided {@code response}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @return a future with the result of the streaming
+ */
+ public Future<Void> stream(HttpResponse response, String filename, long fileLength, Range range)
{
- if (!file.exists() || !file.isFile())
- {
- resp.setNotFoundStatus("File does not exist or it is not a normal file");
- return;
- }
- if (file.length() == 0)
- {
- resp.setBadRequestStatus("File is empty");
- return;
- }
- acquireAndSend(resp, file, range);
+ Promise<Void> promise = Promise.promise();
+ acquireAndSend(response, filename, fileLength, range, Instant.now(), promise);
+ return promise.future();
}
- private void acquireAndSend(HttpResponse response, File file, Range range)
+ /**
+ * Send the file if rate-limiting is disabled or when it successfully acquires a permit from the
+ * {@link SidecarRateLimiter}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @param startTime the start time of this request
+ * @param promise a promise for the stream
+ */
+ private void acquireAndSend(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+ Promise<Void> promise)
{
- acquireAndSend(response, file, range, Instant.now());
+ if (!isRateLimited() || acquire(response, filename, fileLength, range, startTime, promise))
+ {
+ // Stream data if rate limiting is disabled or if we acquire
+ LOGGER.info("Streaming range {} for file {} to client {}. Instance: {}", range, filename,
+ response.remoteAddress(), response.host());
+ response.sendFile(filename, fileLength, range)
+ .onSuccess(v ->
+ {
+ LOGGER.debug("Streamed file {} successfully to client {}. Instance: {}", filename,
+ response.remoteAddress(), response.host());
+ promise.complete();
+ })
+ .onFailure(promise::fail);
+ }
}
/**
- * If permit becomes available within a short time, retry immediately
+ * Acquires a permit from the {@link SidecarRateLimiter} if it can be acquired immediately without
+ * delay. Otherwise, it will retry acquiring the permit later in the future until it exhausts the
+ * retry timeout, in which case it will ask the client to retry later in the future.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @param startTime the start time of this request
+ * @param promise a promise for the stream
+ * @return {@code true} if the permit was acquired, {@code false} otherwise
*/
- private void acquireAndSend(HttpResponse response, File file, Range range, Instant startTime)
+ private boolean acquire(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+ Promise<Void> promise)
{
- while (!rateLimiter.tryAcquire())
+ if (rateLimiter.tryAcquire())
+ return true;
+
+ long microsToWait;
+ if (checkRetriesExhausted(startTime))
+ {
+ LOGGER.error("Retries for acquiring permit exhausted for client {}. Instance: {}", response.remoteAddress(),
+ response.host());
+ promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry exhausted"));
+ }
+ else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L))
+ < TimeUnit.SECONDS.toMicros(config.getThrottleDelayInSeconds()))
{
- if (checkRetriesExhausted(startTime))
- {
- LOGGER.error("Retries for acquiring permit exhausted!");
- response.setTooManyRequestsStatus();
- return;
- }
-
- final long microsToWait = rateLimiter.queryEarliestAvailable(0L);
- if (microsToWait <= 0) // immediately retry
- {
- continue;
- }
-
- if (TimeUnit.MICROSECONDS.toNanos(microsToWait) >= delay.getNano())
- {
- response.setRetryAfterHeader(microsToWait);
- }
- else
- {
- retryStreaming(response, file, range, startTime, microsToWait);
- }
- return;
+ microsToWait = Math.max(0, microsToWait);
+
+ LOGGER.debug("Retrying streaming after {} micros for client {}. Instance: {}", microsToWait,
+ response.remoteAddress(), response.host());
+ vertx.setTimer(MICROSECONDS.toMillis(microsToWait),
+ t -> acquireAndSend(response, filename, fileLength, range, startTime, promise));
+ }
+ else
+ {
+ LOGGER.debug("Asking client {} to retry after {} micros. Instance: {}", response.remoteAddress(),
+ microsToWait, response.host());
+ response.setRetryAfterHeader(microsToWait);
+ promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask client to retry later"));
}
- LOGGER.info("File {} streamed from path {}", file.getName(), file.getAbsolutePath());
- response.sendFile(file, range);
+ return false;
}
+ /**
+ * @return true if this request is rate-limited, false otherwise
+ */
+ private boolean isRateLimited()
+ {
+ return config.getRateLimitStreamRequestsPerSecond() != DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND;
+ }
+
+ /**
+ * @param startTime the request start time
+ * @return true if we exhausted the retries, false otherwise
+ */
private boolean checkRetriesExhausted(Instant startTime)
{
- return startTime.plus(timeout).isBefore(Instant.now());
+ return startTime.plus(Duration.ofSeconds(config.getThrottleTimeoutInSeconds()))
+ .isBefore(Instant.now());
}
- private void retryStreaming(HttpResponse response, File file, Range range, Instant startTime, long microsToSleep)
+ /**
+ * Returns the requested range for the request, or the entire range if {@code rangeHeader} is null
+ *
+ * @param rangeHeader The range header from the request
+ * @param fileLength The length of the file
+ * @return a succeeded future when the parsing is successful, a failed future when the range parsing fails
+ */
+ private Future<Range> parseRangeHeader(String rangeHeader, long fileLength)
{
- SCHEDULER.schedule(() -> acquireAndSend(response, file, range, startTime), microsToSleep, MICROSECONDS);
+ Range fr = new Range(0, fileLength - 1, fileLength);
+ if (rangeHeader == null)
+ return Future.succeededFuture(fr);
+
+ try
+ {
+ // sidecar does not support multiple ranges as of now
+ final Range hr = Range.parseHeader(rangeHeader, fileLength);
+ Range intersect = fr.intersect(hr);
+ LOGGER.debug("Calculated range {} for streaming", intersect);
+ return Future.succeededFuture(intersect);
+ }
+ catch (IllegalArgumentException | RangeException | UnsupportedOperationException e)
+ {
+ LOGGER.error(String.format("Failed to parse header '%s'", rangeHeader), e);
+ return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code()));
+ }
}
}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
index 0ae7fe1..f74d08e 100644
--- a/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
+++ b/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
@@ -35,20 +35,6 @@ public class InstanceMetadataFetcher
: instancesConfig.instanceFromId(instanceId).delegate();
}
- public FilePathBuilder getPathBuilder(String host)
- {
- return host == null
- ? getFirstInstance().pathBuilder()
- : instancesConfig.instanceFromHost(host).pathBuilder();
- }
-
- public FilePathBuilder getPathBuilder(Integer instanceId)
- {
- return instanceId == null
- ? getFirstInstance().pathBuilder()
- : instancesConfig.instanceFromId(instanceId).pathBuilder();
- }
-
private InstanceMetadata getFirstInstance()
{
if (instancesConfig.instances().isEmpty())
diff --git a/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java
deleted file mode 100644
index 10ffafd..0000000
--- a/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package org.apache.cassandra.sidecar;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collections;
-
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.util.Modules;
-import org.apache.cassandra.sidecar.cluster.InstancesConfig;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
-import org.assertj.core.api.Assertions;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/**
- * FilePathBuilderTest
- */
-public class FilePathBuilderTest
-{
- private static final String expectedFilePath = "src/test/resources/instance1/data/TestKeyspace" +
- "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot" +
- "/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- private static FilePathBuilder pathBuilder;
-
- @BeforeAll
- public static void setUp()
- {
- Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
- pathBuilder = injector.getInstance(InstancesConfig.class).instances().get(0).pathBuilder();
- }
-
- @Test
- public void testRoute() throws IOException
- {
- final String keyspace = "TestKeyspace";
- final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
- final String snapshot = "TestSnapshot";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
- assertEquals(expectedFilePath, filePath.toString());
- }
-
- @Test
- public void testKeyspaceNotFound()
- {
- final String keyspace = "random";
- final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
- final String snapshot = "TestSnapshot";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
- {
- pathBuilder.build(keyspace, table, snapshot, component);
- });
- String msg = "Keyspace random does not exist";
- assertEquals(msg, thrownException.getMessage());
- }
-
- @Test
- public void testTableNotFound()
- {
- final String keyspace = "TestKeyspace";
- final String table = "random";
- final String snapshot = "TestSnapshot";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
- {
- pathBuilder.build(keyspace, table, snapshot, component);
- });
- String msg = "Table random not found, path searched: src/test/resources/instance1/data/TestKeyspace";
- assertEquals(msg, thrownException.getMessage());
- }
-
- @Test
- public void testSnapshotNotFound()
- {
- final String keyspace = "TestKeyspace";
- final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
- final String snapshot = "random";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
- {
- pathBuilder.build(keyspace, table, snapshot, component);
- });
- String msg = "Snapshot random not found, path searched: src/test/resources/instance1/data/TestKeyspace" +
- "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
- assertEquals(msg, thrownException.getMessage());
- }
-
- @Test
- public void testPartialTableName() throws FileNotFoundException
- {
- final String keyspace = "TestKeyspace";
- final String table = "TestTable";
- final String snapshot = "TestSnapshot";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
- assertEquals(expectedFilePath, filePath.toString());
- }
-
- @Test
- public void testEmptyDataDir() throws IOException
- {
- String dataDir = new File("./").getCanonicalPath() + "/src/test/resources/instance";
-
- final String keyspace = "TestKeyspace";
- final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
- final String snapshot = "TestSnapshot";
- final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-
- FilePathBuilder pathBuilder = new CachedFilePathBuilder(Collections.singletonList(dataDir));
- FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
- {
- pathBuilder.build(keyspace, table, snapshot, component);
- });
- String msg = "directory empty or does not exist!";
- Assertions.assertThat(thrownException.getMessage()).contains(msg);
- }
-}
diff --git a/src/test/java/org/apache/cassandra/sidecar/LoggerHandlerInjectionTest.java b/src/test/java/org/apache/cassandra/sidecar/LoggerHandlerInjectionTest.java
index 774f880..b31b29a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/LoggerHandlerInjectionTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/LoggerHandlerInjectionTest.java
@@ -89,9 +89,12 @@ public class LoggerHandlerInjectionTest
final CountDownLatch closeLatch = new CountDownLatch(1);
server.close(res -> closeLatch.countDown());
vertx.close();
- if (closeLatch.await(60, TimeUnit.SECONDS)) {
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ {
logger.info("Close event received before timeout.");
- } else {
+ }
+ else
+ {
logger.error("Close event timed out.");
}
}
diff --git a/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java b/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
index fc03af9..889baf9 100644
--- a/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
@@ -69,9 +69,9 @@ public class StreamSSTableComponentTest
void testRoute(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
{
@@ -85,9 +85,9 @@ public class StreamSSTableComponentTest
void testKeyspaceNotFound(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/random/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/random/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
@@ -99,9 +99,9 @@ public class StreamSSTableComponentTest
void testSnapshotNotFound(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/random/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
@@ -113,13 +113,13 @@ public class StreamSSTableComponentTest
void testForbiddenKeyspace(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/system/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/system/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(FORBIDDEN.code());
- assertThat(response.statusMessage()).isEqualTo("system keyspace is forbidden");
+ assertThat(response.statusMessage()).isEqualTo(FORBIDDEN.reasonPhrase());
context.completeNow();
})));
}
@@ -128,13 +128,13 @@ public class StreamSSTableComponentTest
void testIncorrectKeyspaceFormat(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/k*s/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/k*s/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
- assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+ assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
context.completeNow();
})));
}
@@ -143,13 +143,13 @@ public class StreamSSTableComponentTest
void testIncorrectComponentFormat(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data...db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
- assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+ assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
context.completeNow();
})));
}
@@ -158,13 +158,13 @@ public class StreamSSTableComponentTest
void testAccessDeniedToCertainComponents(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Digest.crc32d";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.send(context.succeeding(response -> context.verify(() ->
{
assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
- assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+ assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
context.completeNow();
})));
}
@@ -173,9 +173,9 @@ public class StreamSSTableComponentTest
void testPartialTableName(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable/snapshot/TestSnapshot/component" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable/snapshots/TestSnapshot/component" +
"/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=0-")
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
@@ -190,9 +190,9 @@ public class StreamSSTableComponentTest
void testInvalidRange(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=4-3")
.send(context.succeeding(response -> context.verify(() ->
{
@@ -205,9 +205,9 @@ public class StreamSSTableComponentTest
void testRangeExceeds(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=5-9")
.send(context.succeeding(response -> context.verify(() ->
{
@@ -220,9 +220,9 @@ public class StreamSSTableComponentTest
void testPartialRangeExceeds(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=5-")
.send(context.succeeding(response -> context.verify(() ->
{
@@ -235,9 +235,9 @@ public class StreamSSTableComponentTest
void testRangeBoundaryExceeds(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=0-999999")
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
@@ -252,9 +252,9 @@ public class StreamSSTableComponentTest
void testPartialRangeStreamed(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=0-2") // 3 bytes streamed
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
@@ -269,9 +269,9 @@ public class StreamSSTableComponentTest
void testSuffixRange(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=-2") // last 2 bytes streamed
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
@@ -286,9 +286,9 @@ public class StreamSSTableComponentTest
void testSuffixRangeExceeds(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bytes=-5")
.send(context.succeeding(response -> context.verify(() ->
{
@@ -301,9 +301,9 @@ public class StreamSSTableComponentTest
void testInvalidRangeUnit(VertxTestContext context)
{
WebClient client = WebClient.create(vertx);
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
.putHeader("Range", "bits=0-2")
.send(context.succeeding(response -> context.verify(() ->
{
@@ -317,9 +317,9 @@ public class StreamSSTableComponentTest
{
WebClient client = WebClient.create(vertx);
String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/" +
- "snapshot/TestSnapshot/component/" +
+ "snapshots/TestSnapshot/component/" +
"TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
- client.get(config.getPort(), "localhost", "/api/v1/stream/instance/2" + testRoute)
+ client.get(config.getPort(), "localhost", "/api/v1/instance/2" + testRoute)
.as(BodyCodec.buffer())
.send(context.succeeding(response -> context.verify(() ->
{
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 892dcd7..708b684 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -28,13 +28,14 @@ import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
import org.apache.cassandra.sidecar.common.MockCassandraFactory;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@@ -100,7 +101,6 @@ public class TestModule extends AbstractModule
when(instanceMeta.id()).thenReturn(id);
when(instanceMeta.host()).thenReturn(host);
when(instanceMeta.port()).thenReturn(6475);
- when(instanceMeta.pathBuilder()).thenReturn(new CachedFilePathBuilder(Collections.singletonList(dataDir)));
when(instanceMeta.dataDirs()).thenReturn(Collections.singletonList(dataDir));
CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class);
@@ -110,6 +110,13 @@ public class TestModule extends AbstractModule
return instanceMeta;
}
+ @Provides
+ @Singleton
+ public FileSystem fileSystem(Vertx vertx)
+ {
+ return vertx.fileSystem();
+ }
+
/**
* The Mock factory is used for testing purposes, enabling us to test all failures and possible results
* @return
diff --git a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
index d77c8e5..65f39b9 100644
--- a/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
@@ -67,7 +67,7 @@ public class ThrottleTest
@Test
void testStreamRequestsThrottled() throws Exception
{
- String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+ String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
"/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
for (int i = 0; i < 20; i++)
@@ -89,7 +89,7 @@ public class ThrottleTest
private void unblockingClientRequest(String route)
{
WebClient client = WebClient.create(vertx);
- client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+ client.get(config.getPort(), "localhost", "/api/v1" + route)
.as(BodyCodec.buffer())
.send(resp ->
{
@@ -101,7 +101,7 @@ public class ThrottleTest
{
WebClient client = WebClient.create(vertx);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
- client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+ client.get(config.getPort(), "localhost", "/api/v1" + route)
.as(BodyCodec.buffer())
.send(resp -> future.complete(resp.result()));
return future.get();
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
new file mode 100644
index 0000000..0aa2e3e
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
@@ -0,0 +1,672 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.rules.TemporaryFolder;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+abstract class AbstractSnapshotPathBuilderTest
+{
+ @TempDir
+ File dataDir0;
+
+ @TempDir
+ File dataDir1;
+
+ SnapshotPathBuilder instance;
+ Vertx vertx = Vertx.vertx();
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @BeforeEach
+ void setup() throws IOException
+ {
+ InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+ InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+ InstanceMetadata mockInvalidDataDirInstanceMeta = mock(InstanceMetadata.class);
+ InstanceMetadata mockEmptyDataDirInstanceMeta = mock(InstanceMetadata.class);
+
+ when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+ when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir0.getAbsolutePath(),
+ dataDir1.getAbsolutePath()));
+
+ when(mockInstancesConfig.instanceFromHost("invalidDataDirInstance")).thenReturn(mockInvalidDataDirInstanceMeta);
+ String invalidDirPath = dataDir0.getParentFile().getAbsolutePath() + "/invalid-data-dir";
+ when(mockInvalidDataDirInstanceMeta.dataDirs()).thenReturn(Arrays.asList(invalidDirPath));
+
+ when(mockInstancesConfig.instanceFromHost("emptyDataDirInstance")).thenReturn(mockEmptyDataDirInstanceMeta);
+ when(mockEmptyDataDirInstanceMeta.dataDirs()).thenReturn(Arrays.asList());
+
+ // Create some files and directories
+ assertThat(new File(dataDir0, "not_a_keyspace_dir").createNewFile());
+ assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/not_a_file.db").mkdirs());
+ assertThat(new File(dataDir0, "ks1/not_a_table_dir").createNewFile());
+ assertThat(new File(dataDir0, "ks1/table1/snapshots/not_a_snapshot_dir").createNewFile());
+ assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd").mkdirs());
+
+ assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1").mkdirs());
+
+ // this is a different table with the same "table4" prefix
+ assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+ "/snapshots/this_is_a_valid_snapshot_name_i_❤_u").mkdirs());
+
+ // table && table-<TABLE_UUID>
+ assertThat(new File(dataDir0, "ks1/a_table/snapshots/a_snapshot/").mkdirs());
+ assertThat(new File(dataDir0, "ks1/a_table-a72c8740a57611ec935db766a70c44a1/snapshots/a_snapshot/").mkdirs());
+
+ // create some files inside snapshot backup.2022-03-17-04-PDT
+ assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/data.db").createNewFile());
+ assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/index.db").createNewFile());
+ assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/nb-203-big-TOC.txt")
+ .createNewFile());
+
+ // create some files inside snapshot ea823202-a62c-4603-bb6a-4e15d79091cd
+ assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/data.db")
+ .createNewFile());
+ assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/index.db")
+ .createNewFile());
+ assertThat(
+ new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/nb-203-big-TOC.txt")
+ .createNewFile());
+
+ // create some files inside snapshot snapshot1 in dataDir1
+ assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/data.db").createNewFile());
+ assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/index.db").createNewFile());
+ assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/nb-203-big-TOC.txt").createNewFile());
+
+ assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+ "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+ assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+ "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+ assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+ "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+ .createNewFile());
+
+ vertx = Vertx.vertx();
+ instance = initialize(vertx, mockInstancesConfig);
+ }
+
+ @AfterEach
+ void clear()
+ {
+ assertThat(dataDir0.delete());
+ assertThat(dataDir1.delete());
+ }
+
+ abstract SnapshotPathBuilder initialize(Vertx vertx, InstancesConfig instancesConfig);
+
+ @Test
+ void failsWhenKeyspaceIsNull()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest(null, "table",
+ "snapshot", "component")))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("keyspace must not be null");
+ }
+
+ @Test
+ void failsWhenKeyspaceContainsInvalidCharacters()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("i_❤_u", "table",
+ "snapshot", "component")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid characters in keyspace: i_❤_u", from(t -> ((HttpException) t).getPayload()));
+ }
+
+ @Test
+ void failsWhenKeyspaceContainsPathTraversalAttack()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("../../../etc/passwd",
+ "table",
+ "snapshot",
+ "component")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid characters in keyspace: ../../../etc/passwd", from(t -> ((HttpException) t)
+ .getPayload()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "system_schema", "system_traces", "system_distributed", "system", "system_auth",
+ "system_views", "system_virtual_schema" })
+ void failsWhenKeyspaceIsForbidden(String forbiddenKeyspace)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest(forbiddenKeyspace,
+ "table",
+ "snapshot",
+ "component")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Forbidden")
+ .returns(HttpResponseStatus.FORBIDDEN.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Forbidden keyspace: " + forbiddenKeyspace, from(t -> ((HttpException) t).getPayload()));
+ }
+
+ @Test
+ void failsWhenTableNameIsNull()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ null,
+ "snapshot",
+ "component")))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("tableName must not be null");
+ }
+
+ @Test
+ void failsWhenTableNameContainsInvalidCharacters()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "i_❤_u",
+ "snapshot",
+ "component")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid characters in table name: i_❤_u", from(t -> ((HttpException) t).getPayload()));
+ }
+
+ @Test
+ void failsWhenTableNameContainsPathTraversalAttack()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "../../../etc/passwd",
+ "snapshot",
+ "component")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid characters in table name: ../../../etc/passwd", from(t -> ((HttpException) t)
+ .getPayload()));
+ }
+
+ @Test
+ void failsWhenSnapshotNameIsNull()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ null,
+ "component.db")))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("snapshotName must not be null");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "slash/is-not-allowed", "null-char\0-is-not-allowed", "../../../etc/passwd" })
+ void failsWhenSnapshotNameContainsInvalidCharacters(String invalidFileName)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ invalidFileName,
+ "component.db")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid characters in snapshot name: " + invalidFileName, from(t -> ((HttpException) t)
+ .getPayload()));
+ }
+
+ @Test
+ void failsWhenComponentNameIsNull()
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ "snapshot",
+ null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("componentName must not be null");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "i_❤_u.db", "this-is-not-allowed.jar", "cql-is-not-allowed-here.cql",
+ "json-is-not-allowed-here.json", "crc32-is-not-allowed-here.crc32",
+ "../../../etc/passwd.db" })
+ void failsWhenComponentNameContainsInvalidCharacters(String invalidComponentName)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ "snapshot",
+ invalidComponentName)))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+ .returns("Invalid component name: " + invalidComponentName, from(t -> ((HttpException) t)
+ .getPayload()));
+ }
+
+ @Test
+ void failsWhenDataDirsAreEmpty() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("emptyDataDirInstance",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ "snapshot",
+ "component.db")),
+ "No data directories are available for host 'emptyDataDirInstance'");
+ }
+
+ @Test
+ void failsWhenInvalidDataDirectory() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("invalidDataDirInstance",
+ new StreamSSTableComponentRequest("ks",
+ "table",
+ "snapshot",
+ "component.db")),
+ "Keyspace 'ks' does not exist");
+ }
+
+ @Test
+ void failsWhenKeyspaceDirectoryDoesNotExist() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("non_existent",
+ "table",
+ "snapshot",
+ "component.db")),
+ "Keyspace 'non_existent' does not exist");
+ }
+
+ @Test
+ void failsWhenKeyspaceIsNotADirectory() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("not_a_keyspace_dir",
+ "table",
+ "snapshot",
+ "component.db")),
+ "Keyspace 'not_a_keyspace_dir' does not exist");
+ }
+
+ @Test
+ void failsWhenTableDoesNotExist() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "non_existent",
+ "snapshot",
+ "component.db")),
+ "Table 'non_existent' does not exist");
+ }
+
+ @Test
+ void failsWhenTableDoesNotExistWithSimilarPrefix() throws InterruptedException
+ {
+ // In this scenario, we have other tables with the "table" prefix (i.e table4)
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table",
+ "snapshot",
+ "component.db")),
+ "Table 'table' does not exist");
+ }
+
+ @Test
+ void failsWhenTableNameIsNotADirectory() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "not_a_table_dir",
+ "snapshot",
+ "component.db")),
+ "Table 'not_a_table_dir' does not exist");
+ }
+
+ @Test
+ void failsWhenSnapshotDirectoryDoesNotExist() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "non_existent",
+ "component.db")),
+ "Component 'component.db' does not exist for snapshot 'non_existent'");
+ }
+
+ @Test
+ void failsWhenSnapshotIsNotADirectory() throws InterruptedException
+ {
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "not_a_snapshot_dir",
+ "component.db")),
+ "Component 'component.db' does not exist for snapshot 'not_a_snapshot_dir'");
+ }
+
+ @Test
+ void failsWhenComponentFileDoesNotExist() throws InterruptedException
+ {
+ String errMsg = "Component 'does-not-exist-TOC.txt' does not exist for snapshot 'backup.2022-03-17-04-PDT'";
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "backup.2022-03-17-04-PDT",
+ "does-not-exist-TOC.txt")),
+ errMsg);
+ }
+
+ @Test
+ void failsWhenComponentIsNotAFile() throws InterruptedException
+ {
+ String errMsg = "Component 'not_a_file.db' does not exist for snapshot 'backup.2022-03-17-04-PDT'";
+ failsWithFileNotFoundException(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "backup.2022-03-17-04-PDT",
+ "not_a_file.db")),
+ errMsg);
+ }
+
+ @Test
+ void succeedsWhenComponentExists() throws Exception
+ {
+ String expectedPath;
+ expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/data.db";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "backup.2022-03-17-04-PDT",
+ "data.db")),
+ expectedPath);
+ expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/index.db";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "backup.2022-03-17-04-PDT",
+ "index.db")),
+ expectedPath);
+ expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "table1",
+ "backup.2022-03-17-04-PDT",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+ expectedPath = dataDir0.getAbsolutePath()
+ + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/data.db";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks2",
+ "table2",
+ "ea823202-a62c-4603-bb6a-4e15d79091cd",
+ "data.db")),
+ expectedPath);
+ expectedPath = dataDir0.getAbsolutePath()
+ + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/index.db";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks2",
+ "table2",
+ "ea823202-a62c-4603-bb6a-4e15d79091cd",
+ "index.db")),
+ expectedPath);
+ expectedPath = dataDir0.getAbsolutePath()
+ + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks2",
+ "table2",
+ "ea823202-a62c-4603-bb6a-4e15d79091cd",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+ expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/data.db";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks3",
+ "table3",
+ "snapshot1",
+ "data.db")),
+ expectedPath);
+ expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/index.db";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks3",
+ "table3",
+ "snapshot1",
+ "index.db")),
+ expectedPath);
+ expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(instance.build("localhost",
+ new StreamSSTableComponentRequest("ks3",
+ "table3",
+ "snapshot1",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+
+
+
+ // table table4 shares the prefix with table table4abc
+ expectedPath = dataDir1.getAbsolutePath()
+ + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4abc",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "data.db")),
+ expectedPath);
+ expectedPath = dataDir1.getAbsolutePath()
+ + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4abc",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "index.db")),
+ expectedPath);
+ expectedPath = dataDir1.getAbsolutePath()
+ + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(instance
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4abc",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+
+
+ }
+
+ @Test
+ void testTableWithUUIDPicked() throws IOException, InterruptedException
+ {
+ TemporaryFolder tempFolder = new TemporaryFolder();
+ tempFolder.create();
+ File dataDir = tempFolder.newFolder("data");
+
+ InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+ InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+
+ when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+ when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir.getAbsolutePath()));
+
+ File atable = new File(dataDir, "data/ks1/a_table");
+ assertThat(atable.mkdirs());
+ File atableSnapshot = new File(atable, "snapshots/a_snapshot");
+ assertThat(atableSnapshot.mkdirs());
+ assertThat(new File(atable, "snapshots/a_snapshot/data.db").createNewFile());
+ assertThat(new File(atable, "snapshots/a_snapshot/index.db").createNewFile());
+ assertThat(new File(atable, "snapshots/a_snapshot/nb-203-big-TOC.txt").createNewFile());
+
+ File atableWithUUID = new File(dataDir, "data/ks1/a_table-a72c8740a57611ec935db766a70c44a1");
+ assertThat(atableWithUUID.mkdirs());
+ File atableWithUUIDSnapshot = new File(atableWithUUID, "snapshots/a_snapshot");
+ assertThat(atableWithUUIDSnapshot.mkdirs());
+
+ assertThat(new File(atableWithUUID, "snapshots/a_snapshot/data.db").createNewFile());
+ assertThat(new File(atableWithUUID, "snapshots/a_snapshot/index.db").createNewFile());
+ assertThat(new File(atableWithUUID, "snapshots/a_snapshot/nb-203-big-TOC.txt").createNewFile());
+ assertThat(atableWithUUID.setLastModified(System.currentTimeMillis() + 2000000));
+
+ String expectedPath;
+ // a_table and a_table-<TABLE_UUID> - the latter should be picked
+ SnapshotPathBuilder newBuilder = new SnapshotPathBuilder(vertx.fileSystem(), mockInstancesConfig);
+ expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/data.db";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "a_table",
+ "a_snapshot",
+ "data.db")),
+ expectedPath);
+ expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/index.db";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "a_table",
+ "a_snapshot",
+ "index.db")),
+ expectedPath);
+ expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks1",
+ "a_table",
+ "a_snapshot",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+ }
+
+ @Test
+ void testLastModifiedTablePicked() throws IOException, InterruptedException
+ {
+ TemporaryFolder tempFolder = new TemporaryFolder();
+ tempFolder.create();
+ File dataDir = tempFolder.newFolder("data");
+
+ InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+ InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+
+ when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+ when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir.getAbsolutePath()));
+
+ File table4Old = new File(dataDir, "data/ks4/table4-a6442310a57611ec8b980b0b2009844e1");
+ assertThat(table4Old.mkdirs());
+
+ // table was dropped and recreated. The table gets a new uuid
+ File table4OldSnapshot = new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u");
+ assertThat(table4OldSnapshot.mkdirs());
+ // create some files inside snapshot this_is_a_valid_snapshot_name_i_❤_u in dataDir1
+ assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+ assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+ assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+ .createNewFile());
+
+ File table4New = new File(dataDir, "data/ks4/table4-a72c8740a57611ec935db766a70c44a11");
+ assertThat(table4New.mkdirs());
+
+ File table4NewSnapshot = new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u");
+ assertThat(table4NewSnapshot.mkdirs());
+
+ assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+ assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+ assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+ .createNewFile());
+ assertThat(table4New.setLastModified(System.currentTimeMillis() + 2000000));
+
+ String expectedPath;
+ SnapshotPathBuilder newBuilder = new SnapshotPathBuilder(vertx.fileSystem(), mockInstancesConfig);
+ // table4-a72c8740a57611ec935db766a70c44a1 is the last modified, so it is the correct directory
+ expectedPath = table4New.getAbsolutePath()
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "data.db")),
+ expectedPath);
+ expectedPath = table4New.getAbsolutePath()
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "index.db")),
+ expectedPath);
+ expectedPath = table4New.getAbsolutePath()
+ + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt";
+ succeedsWhenComponentExists(newBuilder
+ .build("localhost",
+ new StreamSSTableComponentRequest("ks4",
+ "table4",
+ "this_is_a_valid_snapshot_name_i_❤_u",
+ "nb-203-big-TOC.txt")),
+ expectedPath);
+ }
+
+ protected void succeedsWhenComponentExists(Future<String> future, String expectedPath)
+ {
+ VertxTestContext testContext = new VertxTestContext();
+ future.onComplete(testContext.succeedingThenComplete());
+ // awaitCompletion has the semantics of a java.util.concurrent.CountDownLatch
+ try
+ {
+ assertThat(testContext.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ assertThat(testContext.failed()).isFalse();
+ // we use ends with here, because MacOS prepends the /private path for temporary directories
+ assertThat(future.result()).endsWith(expectedPath);
+ }
+
+ protected void failsWithFileNotFoundException(Future<String> future, String expectedMessage)
+ {
+ VertxTestContext testContext = new VertxTestContext();
+ future.onComplete(testContext.succeedingThenComplete());
+ // awaitCompletion has the semantics of a java.util.concurrent.CountDownLatch
+ try
+ {
+ assertThat(testContext.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ assertThat(testContext.failed()).isTrue();
+ assertThat(testContext.causeOfFailure()).isInstanceOf(FileNotFoundException.class)
+ .returns(expectedMessage, from(Throwable::getMessage));
+ }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java
new file mode 100644
index 0000000..4cf8688
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+
+@ExtendWith(VertxExtension.class)
+class SnapshotPathBuilderTest extends AbstractSnapshotPathBuilderTest
+{
+ SnapshotPathBuilder initialize(Vertx vertx, InstancesConfig instancesConfig)
+ {
+ return new SnapshotPathBuilder(vertx.fileSystem(), instancesConfig);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org