You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/15 20:15:54 UTC

cassandra git commit: stableloader will fail if there are collections in the schema tables

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 cee35e42d -> 994250c8d


stableloader will fail if there are collections in the schema tables

Fix and new testcase

Patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-10700


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/994250c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/994250c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/994250c8

Branch: refs/heads/cassandra-2.1
Commit: 994250c8d38b3b4299f2e33ebe405ff601b5ae85
Parents: cee35e4
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Dec 14 11:40:53 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Dec 15 14:15:12 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../org/apache/cassandra/tools/BulkLoader.java  |  31 +++-
 .../apache/cassandra/tools/BulkLoaderTest.java  | 172 +++++++++++++++++++
 3 files changed, 202 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f1d66b..8e58703 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,12 @@
 2.1.13
+<<<<<<< HEAD
  * Allow CREATE TABLE WITH ID (CASSANDRA-9179)
  * Make Stress compiles within eclipse (CASSANDRA-10807)
  * Cassandra Daemon should print JVM arguments (CASSANDRA-10764)
  * Allow cancellation of index summary redistribution (CASSANDRA-8805)
+=======
+ * sstableloader will fail if there are collections in the schema tables (CASSANDRA-10700)
+>>>>>>> 5377183... stableloader will fail if there are collections in the schema tables
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Fix Stress profile parsing on Windows (CASSANDRA-10808)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index f4b30cb..96e826d 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,9 +24,10 @@ import java.util.*;
 import com.google.common.base.Joiner;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 
 import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
@@ -310,10 +311,11 @@ public class BulkLoader
                         }
                     }
 
-                    String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'",
-                                                 Keyspace.SYSTEM_KS,
-                                                 SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
-                                                 keyspace);
+                    String cfQuery = String.format("SELECT %s FROM %s.%s WHERE keyspace_name = '%s'",
+                                                   StringUtils.join(getCFColumnsWithoutCollections(), ","),
+                                                   Keyspace.SYSTEM_KS,
+                                                   SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
+                                                   keyspace);
                     CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE);
 
 
@@ -340,6 +342,25 @@ public class BulkLoader
             }
         }
 
+        //Remove dropped_columns since we can't parse collections in v2 which is used by thrift
+        //See CASSANDRA-10700
+        List<String> getCFColumnsWithoutCollections()
+        {
+
+            Iterator<ColumnDefinition> allColumns = CFMetaData.SchemaColumnFamiliesCf.allColumnsInSelectOrder();
+            List<String> selectedColumns = new ArrayList<>();
+
+            while (allColumns.hasNext())
+            {
+                ColumnDefinition def = allColumns.next();
+
+                if (!def.type.isCollection())
+                    selectedColumns.add(UTF8Type.instance.getString(def.name.bytes));
+            }
+
+            return selectedColumns;
+        }
+
         @Override
         public StreamConnectionFactory getConnectionFactory()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
new file mode 100644
index 0000000..dcdb7eb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+
+import java.io.File;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.junit.Assert.assertEquals;
+
+public class BulkLoaderTest
+{
+
+    static EmbeddedCassandraService embeddedCassandraService = new EmbeddedCassandraService();
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        SchemaLoader.cleanupAndLeaveDirs();
+        embeddedCassandraService.start();
+
+
+        QueryProcessor.executeInternal("CREATE KEYSPACE cql_keyspace WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+    }
+
+
+    @Test
+    public void testClientWriter() throws Exception
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table2";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        String schema = "CREATE TABLE cql_keyspace.table2 ("
+                        + "  k int PRIMARY KEY,"
+                        + "  v1 text,"
+                        + "  v2 int"
+                        + ")";
+
+        QueryProcessor.executeInternal(schema);
+
+        String insert = "INSERT INTO cql_keyspace.table2 (k, v1, v2) VALUES (?, ?, ?)";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .withPartitioner(StorageService.instance.getPartitioner())
+                                                  .using(insert).build();
+
+        writer.addRow(0, "test1", 24);
+        writer.addRow(1, "test2", null);
+        writer.addRow(2, "test3", 42);
+        writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
+        writer.close();
+
+        BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()),
+                                                                         DatabaseDescriptor.getRpcPort(),
+                                                                         null, null, new TFramedTransportFactory(),
+                                                                         DatabaseDescriptor.getStoragePort(),
+                                                                         DatabaseDescriptor.getSSLStoragePort(), null);
+
+
+        SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false));
+
+
+
+        loader.stream().get();
+
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table2;");
+        assertEquals(4, rs.size());
+    }
+
+
+    @Test
+    public void testClientWriterWithDroppedColumn() throws Exception
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table3";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        String schemaToDrop = "CREATE TABLE cql_keyspace.table3 ("
+                            + "  k int PRIMARY KEY,"
+                            + "  v1 text,"
+                            + "  v2 int,"
+                            + "  v3 list<int>,"
+                            + "  v4 text"
+                            + ")";
+
+        QueryProcessor.executeInternal(schemaToDrop);
+        QueryProcessor.executeInternal("ALTER TABLE cql_keyspace.table3 DROP v4");
+
+
+        String schema = "CREATE TABLE cql_keyspace.table3 ("
+                        + "  k int PRIMARY KEY,"
+                        + "  v1 text,"
+                        + "  v2 int,"
+                        + "  v3 list<int>"
+                        + ")";
+
+
+        String insert = "INSERT INTO cql_keyspace.table3 (k, v1, v2, v3) VALUES (?, ?, ?, ?)";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .withPartitioner(StorageService.instance.getPartitioner())
+                                                  .using(insert).build();
+
+        writer.addRow(0, "test1", 24, Lists.newArrayList(4));
+        writer.addRow(1, "test2", null, Lists.newArrayList(4,4,5));
+        writer.addRow(2, "test3", 42, null);
+        writer.close();
+
+        BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()),
+                                                                         DatabaseDescriptor.getRpcPort(),
+                                                                         null, null, new TFramedTransportFactory(),
+                                                                         DatabaseDescriptor.getStoragePort(),
+                                                                         DatabaseDescriptor.getSSLStoragePort(), null);
+
+        SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false));
+        
+
+        loader.stream().get();
+
+
+        CFMetaData cfMetaData = client.getCFMetaData(KS, TABLE);
+        assert cfMetaData != null;
+
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table3;");
+        assertEquals(3, rs.size());
+    }
+
+}