You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/03/05 18:52:48 UTC

[3/6] cassandra git commit: Fix CQLSSTableWriter throwing exception and spawning threads

Fix CQLSSTableWriter throwing exception and spawning threads

patch by Benjamin Lerer; reviewed by yukim for CASSANDRA-8808


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb67c41a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb67c41a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb67c41a

Branch: refs/heads/trunk
Commit: fb67c41ad7faeff7a5f33e9e0bca6493a3febe89
Parents: e56d9ef
Author: blerer <be...@datastax.com>
Authored: Tue Mar 3 12:00:57 2015 +0100
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Mar 5 11:41:50 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cql3/statements/UpdateStatement.java        |  23 ++++
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  50 ++++++--
 .../io/sstable/CQLSSTableWriterClientTest.java  | 116 +++++++++++++++++++
 .../io/sstable/CQLSSTableWriterTest.java        |  31 +++--
 5 files changed, 198 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e34c9e..faa14d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@
  * 'nodetool info' prints exception against older node (CASSANDRA-8796)
  * Ensure SSTableSimpleUnsortedWriter.close() terminates if
    disk writer has crashed (CASSANDRA-8807)
+ * Fix CQLSSTableWriter throwing exception and spawning threads
+   (CASSANDRA-8808)
 
 2.0.12:
  * Use more efficient slice size for querying internal secondary

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 594b5db..f34edaf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -50,6 +50,15 @@ public class UpdateStatement extends ModificationStatement
     public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
+        addUpdateForKey(cf, key, builder, params, true);
+    }
+
+    public void addUpdateForKey(ColumnFamily cf,
+                                ByteBuffer key,
+                                ColumnNameBuilder builder,
+                                UpdateParameters params,
+                                boolean validateIndexedColumns) throws InvalidRequestException
+    {
         CFDefinition cfDef = cfm.getCfDef();
 
         if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -106,6 +115,20 @@ public class UpdateStatement extends ModificationStatement
                 update.execute(key, cf, builder.copy(), params);
         }
 
+        // validateIndexedColumns trigger a call to Keyspace.open() which we want to be able to avoid in some case
+        //(e.g. when using CQLSSTableWriter)
+        if (validateIndexedColumns)
+            validateIndexedColumns(cf);
+    }
+
+    /**
+     * Checks that the value of the indexed columns is valid.
+     *
+     * @param cf the column family
+     * @throws InvalidRequestException if one of the values is invalid
+     */
+    private void validateIndexedColumns(ColumnFamily cf) throws InvalidRequestException
+    {
         SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
         if (indexManager.hasIndexes())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 8006112..fb4c186 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.Allocator;
@@ -77,6 +77,11 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CQLSSTableWriter implements Closeable
 {
+    static
+    {
+        Config.setClientMode(true);
+    }
+
     private final AbstractSSTableSimpleWriter writer;
     private final UpdateStatement insert;
     private final List<ColumnSpecification> boundNames;
@@ -215,7 +220,7 @@ public class CQLSSTableWriter implements Closeable
             {
                 if (writer.currentKey() == null || !key.equals(writer.currentKey().key))
                     writer.newRow(key);
-                insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params);
+                insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params, false);
             }
             return this;
         }
@@ -345,19 +350,11 @@ public class CQLSSTableWriter implements Closeable
                     KSMetaData ksm = Schema.instance.getKSMetaData(this.schema.ksName);
                     if (ksm == null)
                     {
-                        ksm = KSMetaData.newKeyspace(this.schema.ksName,
-                                AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
-                                ImmutableMap.of("replication_factor", "1"),
-                                true,
-                                Collections.singleton(this.schema));
-                        Schema.instance.load(ksm);
+                        createKeyspaceWithColumnFamily(this.schema);
                     }
                     else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null)
                     {
-                        Schema.instance.load(this.schema);
-                        ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema)));
-                        Schema.instance.setKeyspaceDefinition(ksm);
-                        Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false);
+                        addColumnFamilyToKeyspace(ksm, this.schema);
                     }
                     return this;
                 }
@@ -369,6 +366,35 @@ public class CQLSSTableWriter implements Closeable
         }
 
         /**
+         * Adds the specified column family to the specified keyspace.
+         *
+         * @param ksm the keyspace meta data
+         * @param cfm the column family meta data
+         */
+        private static void addColumnFamilyToKeyspace(KSMetaData ksm, CFMetaData cfm)
+        {
+            ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm)));
+            Schema.instance.load(cfm);
+            Schema.instance.setKeyspaceDefinition(ksm);
+        }
+
+        /**
+         * Creates a keyspace for the specified column family.
+         *
+         * @param cfm the column family
+         * @throws ConfigurationException if a problem occurs while creating the keyspace.
+         */
+        private static void createKeyspaceWithColumnFamily(CFMetaData cfm) throws ConfigurationException
+        {
+            KSMetaData ksm = KSMetaData.newKeyspace(cfm.ksName,
+                                                    AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"),
+                                                    ImmutableMap.of("replication_factor", "1"),
+                                                    true,
+                                                    Collections.singleton(cfm));
+            Schema.instance.load(ksm);
+        }
+
+        /**
          * The partitioner to use.
          * <p>
          * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
new file mode 100644
index 0000000..d10c9fb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import com.google.common.io.Files;
+
+import org.junit.*;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.FileUtils;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.junit.Assert.assertTrue;
+
+public class CQLSSTableWriterClientTest
+{
+    private File testDirectory;
+
+    @Before
+    public void setUp()
+    {
+        this.testDirectory = Files.createTempDir();
+    }
+
+    @After
+    public void tearDown()
+    {
+        FileUtils.deleteRecursive(this.testDirectory);
+    }
+
+    @AfterClass
+    public static void cleanup() throws Exception
+    {
+        Config.setClientMode(false);
+    }
+
+    @Test
+    public void testWriterInClientMode() throws IOException, InvalidRequestException
+    {
+        final String TABLE1 = "table1";
+        final String TABLE2 = "table2";
+
+        String schema = "CREATE TABLE client_test.%s ("
+                            + "  k int PRIMARY KEY,"
+                            + "  v1 text,"
+                            + "  v2 int"
+                            + ")";
+        String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)";
+
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(this.testDirectory)
+                                                  .forTable(String.format(schema, TABLE1))
+                                                  .using(String.format(insert, TABLE1)).build();
+
+        CQLSSTableWriter writer2 = CQLSSTableWriter.builder()
+                                                   .inDirectory(this.testDirectory)
+                                                   .forTable(String.format(schema, TABLE2))
+                                                   .using(String.format(insert, TABLE2)).build();
+
+        writer.addRow(0, "A", 0);
+        writer2.addRow(0, "A", 0);
+        writer.addRow(1, "B", 1);
+        writer2.addRow(1, "B", 1);
+        writer.close();
+        writer2.close();
+
+        assertContainsDataFiles(this.testDirectory, "client_test-table1", "client_test-table2");
+    }
+
+    /**
+     * Checks that the specified directory contains the files with the specified prefixes.
+     *
+     * @param directory the directory containing the data files
+     * @param prefixes the file prefixes
+     */
+    private static void assertContainsDataFiles(File directory, String... prefixes)
+    {
+        FilenameFilter filter = new FilenameFilter()
+        {
+            @Override
+            public boolean accept(File dir, String name)
+            {
+                return name.endsWith("-Data.db");
+            }
+        };
+
+        File[] dataFiles = directory.listFiles(filter);
+        Arrays.sort(dataFiles);
+
+        assertEquals(dataFiles.length, prefixes.length);
+        for (int i = 0; i < dataFiles.length; i++)
+            assertTrue(dataFiles[i].toString().contains(prefixes[i]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 0f123a4..0922502 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -25,24 +25,25 @@ import java.util.Iterator;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
+
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 public class CQLSSTableWriterTest
 {
     @BeforeClass
@@ -51,6 +52,12 @@ public class CQLSSTableWriterTest
         StorageService.instance.initServer();
     }
 
+    @AfterClass
+    public static void tearDown()
+    {
+        Config.setClientMode(false);
+    }
+
     @Test
     public void testUnsortedWriter() throws Exception
     {
@@ -176,12 +183,12 @@ public class CQLSSTableWriterTest
         @Override
         public void run()
         {
-            String schema = "CREATE TABLE cql_keyspace.table2 ("
+            String schema = "CREATE TABLE cql_keyspace2.table2 ("
                     + "  k int,"
                     + "  v int,"
                     + "  PRIMARY KEY (k, v)"
                     + ")";
-            String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, ?)";
+            String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
             CQLSSTableWriter writer = CQLSSTableWriter.builder()
                     .inDirectory(dataDir)
                     .forTable(schema)
@@ -206,7 +213,7 @@ public class CQLSSTableWriterTest
     @Test
     public void testConcurrentWriters() throws Exception
     {
-        String KS = "cql_keyspace";
+        String KS = "cql_keyspace2";
         String TABLE = "table2";
 
         File tempdir = Files.createTempDir();
@@ -235,7 +242,7 @@ public class CQLSSTableWriterTest
         {
             public void init(String keyspace)
             {
-                for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+                for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace2"))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
@@ -248,7 +255,7 @@ public class CQLSSTableWriterTest
 
         loader.stream().get();
 
-        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table2;");
+        UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace2.table2;");
         assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
     }
 }