You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2018/03/29 17:44:10 UTC
cassandra git commit: Add sstableloader option to accept target
keyspace name
Repository: cassandra
Updated Branches:
refs/heads/trunk fc2e420fd -> c22ee2bd4
Add sstableloader option to accept target keyspace name
patch by Jaydeepkumar Chovatia; reviewed by Jay Zhuang for CASSANDRA-13884
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c22ee2bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c22ee2bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c22ee2bd
Branch: refs/heads/trunk
Commit: c22ee2bd451d030e99cfb65be839bbc735a5352f
Parents: fc2e420
Author: jaydeepkumar1984 <ch...@gmail.com>
Authored: Mon Sep 18 17:07:56 2017 -0700
Committer: Jay Zhuang <ja...@yahoo.com>
Committed: Thu Mar 29 10:41:27 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 6 +--
.../org/apache/cassandra/tools/BulkLoader.java | 3 +-
.../apache/cassandra/tools/LoaderOptions.java | 14 ++++++
.../cassandra/io/sstable/SSTableLoaderTest.java | 48 ++++++++++++++++++++
5 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0551b7..04705ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
* Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
* Add coordinator write metric per CF (CASSANDRA-14232)
* Fix scheduling of speculative retry threshold recalculation (CASSANDRA-14338)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 980fdf1..a6985f7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,13 +56,13 @@ public class SSTableLoader implements StreamEventHandler
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
- this(directory, client, outputHandler, 1);
+ this(directory, client, outputHandler, 1, null);
}
- public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+ public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace)
{
this.directory = directory;
- this.keyspace = directory.getParentFile().getName();
+ this.keyspace = targetKeyspace != null ? targetKeyspace : directory.getParentFile().getName();
this.client = client;
this.outputHandler = outputHandler;
this.connectionsPerHost = connectionsPerHost;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 545d1f7..d85c605 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -65,7 +65,8 @@ public class BulkLoader
buildSSLOptions(options.clientEncOptions),
options.allowServerPortDiscovery),
handler,
- options.connectionsPerHost);
+ options.connectionsPerHost,
+ options.targetKeyspace);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
StreamResultFuture future = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 3686584..d6cb670 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.PlainTextAuthProvider;
import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
public class LoaderOptions
{
@@ -59,6 +60,7 @@ public class LoaderOptions
public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
public static final String TOOL_NAME = "sstableloader";
public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = "server-port-discovery";
+ public static final String TARGET_KEYSPACE = "target-keyspace";
/* client encryption options */
public static final String SSL_TRUSTSTORE = "truststore";
@@ -88,6 +90,7 @@ public class LoaderOptions
public final Set<InetSocketAddress> hosts;
public final Set<InetAddressAndPort> ignores;
public final boolean allowServerPortDiscovery;
+ public final String targetKeyspace;
LoaderOptions(Builder builder)
{
@@ -109,6 +112,7 @@ public class LoaderOptions
hosts = builder.hosts;
allowServerPortDiscovery = builder.allowServerPortDiscovery;
ignores = builder.ignores;
+ targetKeyspace = builder.targetKeyspace;
}
static class Builder
@@ -134,6 +138,7 @@ public class LoaderOptions
Set<InetSocketAddress> hosts = new HashSet<>();
Set<InetAddressAndPort> ignores = new HashSet<>();
boolean allowServerPortDiscovery;
+ String targetKeyspace;
Builder()
{
@@ -509,6 +514,14 @@ public class LoaderOptions
clientEncOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
}
+ if (cmd.hasOption(TARGET_KEYSPACE))
+ {
+ targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE);
+ if (StringUtils.isBlank(targetKeyspace))
+ {
+ errorMsg("Empty keyspace is not supported.", options);
+ }
+ }
return this;
}
catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -615,6 +628,7 @@ public class LoaderOptions
options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow server port discovery", "Use ports published by server to decide how to connect. With SSL requires StartTLS to be used.");
+ options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name");
return options;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 430b7c2..8509115 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertTrue;
public class SSTableLoaderTest
{
public static final String KEYSPACE1 = "SSTableLoaderTest";
+ public static final String KEYSPACE2 = "SSTableLoaderTest1";
public static final String CF_STANDARD1 = "Standard1";
public static final String CF_STANDARD2 = "Standard2";
@@ -70,6 +71,11 @@ public class SSTableLoaderTest
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
+ SchemaLoader.createKeyspace(KEYSPACE2,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1),
+ SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD2));
+
StorageService.instance.initServer();
}
@@ -208,6 +214,48 @@ public class SSTableLoaderTest
latch.await();
}
+ @Test
+ public void testLoadingSSTableToDifferentKeyspace() throws Exception
+ {
+ File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD1);
+ assert dataDir.mkdirs();
+ TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1);
+
+ String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+ String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+
+ try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
+ .using(String.format(query, KEYSPACE1, CF_STANDARD1))
+ .build())
+ {
+ writer.addRow("key1", "col1", "100");
+ }
+
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+ cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2);
+ loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
+
+ cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
+ cfs.forceBlockingFlush();
+
+ List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
+
+ assertEquals(1, partitions.size());
+ assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+ assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
+ .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
+ .value());
+
+ // The stream future is signalled when the work is complete but before releasing references. Wait for release
+ // before cleanup (CASSANDRA-10118).
+ latch.await();
+ }
+
StreamEventHandler completionStreamListener(final CountDownLatch latch)
{
return new StreamEventHandler() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org