You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2018/03/29 17:44:10 UTC

cassandra git commit: Add sstableloader option to accept target keyspace name

Repository: cassandra
Updated Branches:
  refs/heads/trunk fc2e420fd -> c22ee2bd4


Add sstableloader option to accept target keyspace name

patch by Jaydeepkumar Chovatia; reviewed by Jay Zhuang for CASSANDRA-13884


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c22ee2bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c22ee2bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c22ee2bd

Branch: refs/heads/trunk
Commit: c22ee2bd451d030e99cfb65be839bbc735a5352f
Parents: fc2e420
Author: jaydeepkumar1984 <ch...@gmail.com>
Authored: Mon Sep 18 17:07:56 2017 -0700
Committer: Jay Zhuang <ja...@yahoo.com>
Committed: Thu Mar 29 10:41:27 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableLoader.java     |  6 +--
 .../org/apache/cassandra/tools/BulkLoader.java  |  3 +-
 .../apache/cassandra/tools/LoaderOptions.java   | 14 ++++++
 .../cassandra/io/sstable/SSTableLoaderTest.java | 48 ++++++++++++++++++++
 5 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c0551b7..04705ba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
  * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
  * Add coordinator write metric per CF (CASSANDRA-14232)
  * Fix scheduling of speculative retry threshold recalculation (CASSANDRA-14338)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 980fdf1..a6985f7 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -56,13 +56,13 @@ public class SSTableLoader implements StreamEventHandler
 
     public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
     {
-        this(directory, client, outputHandler, 1);
+        this(directory, client, outputHandler, 1, null);
     }
 
-    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+    public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace)
     {
         this.directory = directory;
-        this.keyspace = directory.getParentFile().getName();
+        this.keyspace = targetKeyspace != null ? targetKeyspace : directory.getParentFile().getName();
         this.client = client;
         this.outputHandler = outputHandler;
         this.connectionsPerHost = connectionsPerHost;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 545d1f7..d85c605 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -65,7 +65,8 @@ public class BulkLoader
                         buildSSLOptions(options.clientEncOptions),
                         options.allowServerPortDiscovery),
                         handler,
-                        options.connectionsPerHost);
+                        options.connectionsPerHost,
+                        options.targetKeyspace);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
         DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle);
         StreamResultFuture future = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 3686584..d6cb670 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
 import com.datastax.driver.core.AuthProvider;
 import com.datastax.driver.core.PlainTextAuthProvider;
 import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
 
 public class LoaderOptions
 {
@@ -59,6 +60,7 @@ public class LoaderOptions
     public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
     public static final String TOOL_NAME = "sstableloader";
     public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = "server-port-discovery";
+    public static final String TARGET_KEYSPACE = "target-keyspace";
 
     /* client encryption options */
     public static final String SSL_TRUSTSTORE = "truststore";
@@ -88,6 +90,7 @@ public class LoaderOptions
     public final Set<InetSocketAddress> hosts;
     public final Set<InetAddressAndPort> ignores;
     public final boolean allowServerPortDiscovery;
+    public final String targetKeyspace;
 
     LoaderOptions(Builder builder)
     {
@@ -109,6 +112,7 @@ public class LoaderOptions
         hosts = builder.hosts;
         allowServerPortDiscovery = builder.allowServerPortDiscovery;
         ignores = builder.ignores;
+        targetKeyspace = builder.targetKeyspace;
     }
 
     static class Builder
@@ -134,6 +138,7 @@ public class LoaderOptions
         Set<InetSocketAddress> hosts = new HashSet<>();
         Set<InetAddressAndPort> ignores = new HashSet<>();
         boolean allowServerPortDiscovery;
+        String targetKeyspace;
 
         Builder()
         {
@@ -509,6 +514,14 @@ public class LoaderOptions
                     clientEncOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
                 }
 
+                if (cmd.hasOption(TARGET_KEYSPACE))
+                {
+                    targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE);
+                    if (StringUtils.isBlank(targetKeyspace))
+                    {
+                        errorMsg("Empty keyspace is not supported.", options);
+                    }
+                }
                 return this;
             }
             catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -615,6 +628,7 @@ public class LoaderOptions
         options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use");
         options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
         options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow server port discovery", "Use ports published by server to decide how to connect. With SSL requires StartTLS to be used.");
+        options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name");
         return options;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 430b7c2..8509115 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -56,6 +56,7 @@ import static org.junit.Assert.assertTrue;
 public class SSTableLoaderTest
 {
     public static final String KEYSPACE1 = "SSTableLoaderTest";
+    public static final String KEYSPACE2 = "SSTableLoaderTest1";
     public static final String CF_STANDARD1 = "Standard1";
     public static final String CF_STANDARD2 = "Standard2";
 
@@ -70,6 +71,11 @@ public class SSTableLoaderTest
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
 
+        SchemaLoader.createKeyspace(KEYSPACE2,
+                KeyspaceParams.simple(1),
+                SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1),
+                SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD2));
+
         StorageService.instance.initServer();
     }
 
@@ -208,6 +214,48 @@ public class SSTableLoaderTest
         latch.await();
     }
 
+    @Test
+    public void testLoadingSSTableToDifferentKeyspace() throws Exception
+    {
+        File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD1);
+        assert dataDir.mkdirs();
+        TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1);
+
+        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+
+        try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                .inDirectory(dataDir)
+                .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1))
+                .using(String.format(query, KEYSPACE1, CF_STANDARD1))
+                .build())
+        {
+            writer.addRow("key1", "col1", "100");
+        }
+
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2);
+        loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
+
+        cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
+        cfs.forceBlockingFlush();
+
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
+
+        assertEquals(1, partitions.size());
+        assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
+        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
+                .getCell(metadata.getColumn(ByteBufferUtil.bytes("val")))
+                .value());
+
+        // The stream future is signalled when the work is complete but before releasing references. Wait for release
+        // before cleanup (CASSANDRA-10118).
+        latch.await();
+    }
+
     StreamEventHandler completionStreamListener(final CountDownLatch latch)
     {
         return new StreamEventHandler() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org