You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/03/05 18:52:48 UTC
[3/6] cassandra git commit: Fix CQLSSTableWriter throwing exception
and spawning threads
Fix CQLSSTableWriter throwing exception and spawning threads
patch by Benjamin Lerer; reviewed by yukim for CASSANDRA-8808
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb67c41a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb67c41a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb67c41a
Branch: refs/heads/trunk
Commit: fb67c41ad7faeff7a5f33e9e0bca6493a3febe89
Parents: e56d9ef
Author: blerer <be...@datastax.com>
Authored: Tue Mar 3 12:00:57 2015 +0100
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 5 11:41:50 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../cql3/statements/UpdateStatement.java | 23 ++++
.../cassandra/io/sstable/CQLSSTableWriter.java | 50 ++++++--
.../io/sstable/CQLSSTableWriterClientTest.java | 116 +++++++++++++++++++
.../io/sstable/CQLSSTableWriterTest.java | 31 +++--
5 files changed, 198 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e34c9e..faa14d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@
* 'nodetool info' prints exception against older node (CASSANDRA-8796)
* Ensure SSTableSimpleUnsortedWriter.close() terminates if
disk writer has crashed (CASSANDRA-8807)
+ * Fix CQLSSTableWriter throwing exception and spawning threads
+ (CASSANDRA-8808)
2.0.12:
* Use more efficient slice size for querying internal secondary
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 594b5db..f34edaf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -50,6 +50,15 @@ public class UpdateStatement extends ModificationStatement
public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
throws InvalidRequestException
{
+ addUpdateForKey(cf, key, builder, params, true);
+ }
+
+ public void addUpdateForKey(ColumnFamily cf,
+ ByteBuffer key,
+ ColumnNameBuilder builder,
+ UpdateParameters params,
+ boolean validateIndexedColumns) throws InvalidRequestException
+ {
CFDefinition cfDef = cfm.getCfDef();
if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -106,6 +115,20 @@ public class UpdateStatement extends ModificationStatement
update.execute(key, cf, builder.copy(), params);
}
+ // validateIndexedColumns trigger a call to Keyspace.open() which we want to be able to avoid in some case
+ //(e.g. when using CQLSSTableWriter)
+ if (validateIndexedColumns)
+ validateIndexedColumns(cf);
+ }
+
+ /**
+ * Checks that the value of the indexed columns is valid.
+ *
+ * @param cf the column family
+ * @throws InvalidRequestException if one of the values is invalid
+ */
+ private void validateIndexedColumns(ColumnFamily cf) throws InvalidRequestException
+ {
SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
if (indexManager.hasIndexes())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 8006112..fb4c186 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.utils.Allocator;
@@ -77,6 +77,11 @@ import org.apache.cassandra.utils.Pair;
*/
public class CQLSSTableWriter implements Closeable
{
+ static
+ {
+ Config.setClientMode(true);
+ }
+
private final AbstractSSTableSimpleWriter writer;
private final UpdateStatement insert;
private final List<ColumnSpecification> boundNames;
@@ -215,7 +220,7 @@ public class CQLSSTableWriter implements Closeable
{
if (writer.currentKey() == null || !key.equals(writer.currentKey().key))
writer.newRow(key);
- insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params);
+ insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params, false);
}
return this;
}
@@ -345,19 +350,11 @@ public class CQLSSTableWriter implements Closeable
KSMetaData ksm = Schema.instance.getKSMetaData(this.schema.ksName);
if (ksm == null)
{
- ksm = KSMetaData.newKeyspace(this.schema.ksName,
- AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
- ImmutableMap.of("replication_factor", "1"),
- true,
- Collections.singleton(this.schema));
- Schema.instance.load(ksm);
+ createKeyspaceWithColumnFamily(this.schema);
}
else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null)
{
- Schema.instance.load(this.schema);
- ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema)));
- Schema.instance.setKeyspaceDefinition(ksm);
- Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false);
+ addColumnFamilyToKeyspace(ksm, this.schema);
}
return this;
}
@@ -369,6 +366,35 @@ public class CQLSSTableWriter implements Closeable
}
/**
+ * Adds the specified column family to the specified keyspace.
+ *
+ * @param ksm the keyspace meta data
+ * @param cfm the column family meta data
+ */
+ private static void addColumnFamilyToKeyspace(KSMetaData ksm, CFMetaData cfm)
+ {
+ ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
+ Schema.instance.load(cfm);
+ Schema.instance.setKeyspaceDefinition(ksm);
+ }
+
+ /**
+ * Creates a keyspace for the specified column family.
+ *
+ * @param cfm the column family
+ * @throws ConfigurationException if a problem occurs while creating the keyspace.
+ */
+ private static void createKeyspaceWithColumnFamily(CFMetaData cfm) throws ConfigurationException
+ {
+ KSMetaData ksm = KSMetaData.newKeyspace(cfm.ksName,
+ AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+ ImmutableMap.of("replication_factor", "1"),
+ true,
+ Collections.singleton(cfm));
+ Schema.instance.load(ksm);
+ }
+
+ /**
* The partitioner to use.
* <p>
* By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
new file mode 100644
index 0000000..d10c9fb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.google.common.io.Files;
+
+import org.junit.*;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.FileUtils;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.junit.Assert.assertTrue;
+
+public class CQLSSTableWriterClientTest
+{
+ private File testDirectory;
+
+ @Before
+ public void setUp()
+ {
+ this.testDirectory = Files.createTempDir();
+ }
+
+ @After
+ public void tearDown()
+ {
+ FileUtils.deleteRecursive(this.testDirectory);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception
+ {
+ Config.setClientMode(false);
+ }
+
+ @Test
+ public void testWriterInClientMode() throws IOException, InvalidRequestException
+ {
+ final String TABLE1 = "table1";
+ final String TABLE2 = "table2";
+
+ String schema = "CREATE TABLE client_test.%s ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int"
+ + ")";
+ String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)";
+
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(this.testDirectory)
+ .forTable(String.format(schema, TABLE1))
+ .using(String.format(insert, TABLE1)).build();
+
+ CQLSSTableWriter writer2 = CQLSSTableWriter.builder()
+ .inDirectory(this.testDirectory)
+ .forTable(String.format(schema, TABLE2))
+ .using(String.format(insert, TABLE2)).build();
+
+ writer.addRow(0, "A", 0);
+ writer2.addRow(0, "A", 0);
+ writer.addRow(1, "B", 1);
+ writer2.addRow(1, "B", 1);
+ writer.close();
+ writer2.close();
+
+ assertContainsDataFiles(this.testDirectory, "client_test-table1", "client_test-table2");
+ }
+
+ /**
+ * Checks that the specified directory contains the files with the specified prefixes.
+ *
+ * @param directory the directory containing the data files
+ * @param prefixes the file prefixes
+ */
+ private static void assertContainsDataFiles(File directory, String... prefixes)
+ {
+ FilenameFilter filter = new FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith("-Data.db");
+ }
+ };
+
+ File[] dataFiles = directory.listFiles(filter);
+ Arrays.sort(dataFiles);
+
+ assertEquals(dataFiles.length, prefixes.length);
+ for (int i = 0; i < dataFiles.length; i++)
+ assertTrue(dataFiles[i].toString().contains(prefixes[i]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 0f123a4..0922502 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -25,24 +25,25 @@ import java.util.Iterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
+
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
public class CQLSSTableWriterTest
{
@BeforeClass
@@ -51,6 +52,12 @@ public class CQLSSTableWriterTest
StorageService.instance.initServer();
}
+ @AfterClass
+ public static void tearDown()
+ {
+ Config.setClientMode(false);
+ }
+
@Test
public void testUnsortedWriter() throws Exception
{
@@ -176,12 +183,12 @@ public class CQLSSTableWriterTest
@Override
public void run()
{
- String schema = "CREATE TABLE cql_keyspace.table2 ("
+ String schema = "CREATE TABLE cql_keyspace2.table2 ("
+ " k int,"
+ " v int,"
+ " PRIMARY KEY (k, v)"
+ ")";
- String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, ?)";
+ String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory(dataDir)
.forTable(schema)
@@ -206,7 +213,7 @@ public class CQLSSTableWriterTest
@Test
public void testConcurrentWriters() throws Exception
{
- String KS = "cql_keyspace";
+ String KS = "cql_keyspace2";
String TABLE = "table2";
File tempdir = Files.createTempDir();
@@ -235,7 +242,7 @@ public class CQLSSTableWriterTest
{
public void init(String keyspace)
{
- for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+ for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace2"))
addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
setPartitioner(StorageService.getPartitioner());
}
@@ -248,7 +255,7 @@ public class CQLSSTableWriterTest
loader.stream().get();
- UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table2;");
+ UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace2.table2;");
assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
}
}