You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2017/06/29 16:05:31 UTC
[3/6] cassandra git commit: Allow native function calls in
CQLSSTableWriter
Allow native function calls in CQLSSTableWriter
Patch by Alex Petrov; reviewed by Joel Knighton for CASSANDRA-12606
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1baeada
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1baeada
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1baeada
Branch: refs/heads/trunk
Commit: a1baeadab9d726d2ceeed795bb6efb13464dec4a
Parents: 5aec834
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Oct 11 09:51:50 2016 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Thu Jun 29 17:50:21 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/CQLSSTableWriter.java | 7 +
.../io/sstable/CQLSSTableWriterTest.java | 149 +++++++++++++++----
3 files changed, 130 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d90d220..c5179e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)
* Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
* Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 81a3356..39f7339 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.cql3.statements.UpdateStatement;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.dht.IPartitioner;
@@ -40,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.schema.Tables;
import org.apache.cassandra.schema.Types;
import org.apache.cassandra.service.ClientState;
@@ -354,6 +356,11 @@ public class CQLSSTableWriter implements Closeable
{
synchronized (CQLSSTableWriter.class)
{
+ if (Schema.instance.getKSMetaData(SchemaKeyspace.NAME) == null)
+ Schema.instance.load(SchemaKeyspace.metadata());
+ if (Schema.instance.getKSMetaData(SystemKeyspace.NAME) == null)
+ Schema.instance.load(SystemKeyspace.metadata());
+
this.schema = getTableMetadata(schema);
// We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1baeada/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index e6d18c4..7d79036 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
@@ -42,10 +43,13 @@ import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
public class CQLSSTableWriterTest
{
@@ -94,24 +98,7 @@ public class CQLSSTableWriterTest
writer.close();
- SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
- {
- private String keyspace;
-
- public void init(String keyspace)
- {
- this.keyspace = keyspace;
- for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
- }
-
- public CFMetaData getTableMetadata(String cfName)
- {
- return Schema.instance.getCFMetaData(keyspace, cfName);
- }
- }, new OutputHandler.SystemOutput(false, false));
-
- loader.stream().get();
+ loadSSTables(dataDir, KS);
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1;");
assertEquals(4, rs.size());
@@ -142,7 +129,7 @@ public class CQLSSTableWriterTest
}
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void testForbidCounterUpdates() throws Exception
{
String KS = "cql_keyspace";
@@ -158,10 +145,18 @@ public class CQLSSTableWriterTest
" PRIMARY KEY (my_id)" +
")";
String insert = String.format("UPDATE cql_keyspace.counter1 SET my_counter = my_counter - ? WHERE my_id = ?");
- CQLSSTableWriter.builder().inDirectory(dataDir)
- .forTable(schema)
- .withPartitioner(Murmur3Partitioner.instance)
- .using(insert).build();
+ try
+ {
+ CQLSSTableWriter.builder().inDirectory(dataDir)
+ .forTable(schema)
+ .withPartitioner(Murmur3Partitioner.instance)
+ .using(insert).build();
+ fail("Counter update statements should not be supported");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertEquals(e.getMessage(), "Counter update statements are not supported");
+ }
}
@Test
@@ -230,7 +225,102 @@ public class CQLSSTableWriterTest
}
+ @Test
+ public void testUpdateStatement() throws Exception
+ {
+ final String KS = "cql_keyspace6";
+ final String TABLE = "table6";
+ final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ + " k int,"
+ + " c1 int,"
+ + " c2 int,"
+ + " v text,"
+ + " PRIMARY KEY (k, c1, c2)"
+ + ")";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using("UPDATE " + KS + "." + TABLE + " SET v = ? " +
+ "WHERE k = ? AND c1 = ? AND c2 = ?")
+ .build();
+
+ writer.addRow("a", 1, 2, 3);
+ writer.addRow("b", 4, 5, 6);
+ writer.addRow(null, 7, 8, 9);
+ writer.close();
+ loadSSTables(dataDir, KS);
+
+ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
+ assertEquals(2, resultSet.size());
+
+ Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+ UntypedResultSet.Row r1 = iter.next();
+ assertEquals(1, r1.getInt("k"));
+ assertEquals(2, r1.getInt("c1"));
+ assertEquals(3, r1.getInt("c2"));
+ assertEquals("a", r1.getString("v"));
+ UntypedResultSet.Row r2 = iter.next();
+ assertEquals(4, r2.getInt("k"));
+ assertEquals(5, r2.getInt("c1"));
+ assertEquals(6, r2.getInt("c2"));
+ assertEquals("b", r2.getString("v"));
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testNativeFunctions() throws Exception
+ {
+ final String KS = "cql_keyspace7";
+ final String TABLE = "table7";
+
+ final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+ + " k int,"
+ + " c1 int,"
+ + " c2 int,"
+ + " v blob,"
+ + " PRIMARY KEY (k, c1, c2)"
+ + ")";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using("INSERT INTO " + KS + "." + TABLE + " (k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))")
+ .build();
+
+ writer.addRow(1, 2, 3, "abc");
+ writer.addRow(4, 5, 6, "efg");
+
+ writer.close();
+ loadSSTables(dataDir, KS);
+
+ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS + "." + TABLE);
+ assertEquals(2, resultSet.size());
+
+ Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+ UntypedResultSet.Row r1 = iter.next();
+ assertEquals(1, r1.getInt("k"));
+ assertEquals(2, r1.getInt("c1"));
+ assertEquals(3, r1.getInt("c2"));
+ assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
+
+ UntypedResultSet.Row r2 = iter.next();
+ assertEquals(4, r2.getInt("k"));
+ assertEquals(5, r2.getInt("c1"));
+ assertEquals(6, r2.getInt("c2"));
+ assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v"));
+
+ assertFalse(iter.hasNext());
+ }
private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
private class WriterThread extends Thread
@@ -302,6 +392,14 @@ public class CQLSSTableWriterTest
}
}
+ loadSSTables(dataDir, KS);
+
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
+ assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
+ }
+
+ private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException
+ {
SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
{
private String keyspace;
@@ -309,7 +407,7 @@ public class CQLSSTableWriterTest
public void init(String keyspace)
{
this.keyspace = keyspace;
- for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
+ for (Range<Token> range : StorageService.instance.getLocalRanges(ks))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
}
@@ -320,8 +418,5 @@ public class CQLSSTableWriterTest
}, new OutputHandler.SystemOutput(false, false));
loader.stream().get();
-
- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
- assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org