You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/15 20:15:54 UTC
cassandra git commit: stableloader will fail if there are collections
in the schema tables
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 cee35e42d -> 994250c8d
stableloader will fail if there are collections in the schema tables
Fix and new testcase
Patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-10700
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/994250c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/994250c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/994250c8
Branch: refs/heads/cassandra-2.1
Commit: 994250c8d38b3b4299f2e33ebe405ff601b5ae85
Parents: cee35e4
Author: T Jake Luciani <ja...@apache.org>
Authored: Mon Dec 14 11:40:53 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Dec 15 14:15:12 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../org/apache/cassandra/tools/BulkLoader.java | 31 +++-
.../apache/cassandra/tools/BulkLoaderTest.java | 172 +++++++++++++++++++
3 files changed, 202 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f1d66b..8e58703 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,8 +1,12 @@
2.1.13
+<<<<<<< HEAD
* Allow CREATE TABLE WITH ID (CASSANDRA-9179)
* Make Stress compiles within eclipse (CASSANDRA-10807)
* Cassandra Daemon should print JVM arguments (CASSANDRA-10764)
* Allow cancellation of index summary redistribution (CASSANDRA-8805)
+=======
+ * sstableloader will fail if there are collections in the schema tables (CASSANDRA-10700)
+>>>>>>> 5377183... stableloader will fail if there are collections in the schema tables
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Fix Stress profile parsing on Windows (CASSANDRA-10808)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index f4b30cb..96e826d 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,9 +24,10 @@ import java.util.*;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
+
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
@@ -310,10 +311,11 @@ public class BulkLoader
}
}
- String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'",
- Keyspace.SYSTEM_KS,
- SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
- keyspace);
+ String cfQuery = String.format("SELECT %s FROM %s.%s WHERE keyspace_name = '%s'",
+ StringUtils.join(getCFColumnsWithoutCollections(), ","),
+ Keyspace.SYSTEM_KS,
+ SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF,
+ keyspace);
CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE);
@@ -340,6 +342,25 @@ public class BulkLoader
}
}
+ //Remove dropped_columns since we can't parse collections in v2 which is used by thrift
+ //See CASSANDRA-10700
+ List<String> getCFColumnsWithoutCollections()
+ {
+
+ Iterator<ColumnDefinition> allColumns = CFMetaData.SchemaColumnFamiliesCf.allColumnsInSelectOrder();
+ List<String> selectedColumns = new ArrayList<>();
+
+ while (allColumns.hasNext())
+ {
+ ColumnDefinition def = allColumns.next();
+
+ if (!def.type.isCollection())
+ selectedColumns.add(UTF8Type.instance.getString(def.name.bytes));
+ }
+
+ return selectedColumns;
+ }
+
@Override
public StreamConnectionFactory getConnectionFactory()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
new file mode 100644
index 0000000..dcdb7eb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+
+import java.io.File;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.junit.Assert.assertEquals;
+
+public class BulkLoaderTest
+{
+
+ static EmbeddedCassandraService embeddedCassandraService = new EmbeddedCassandraService();
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ SchemaLoader.cleanupAndLeaveDirs();
+ embeddedCassandraService.start();
+
+
+ QueryProcessor.executeInternal("CREATE KEYSPACE cql_keyspace WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+ }
+
+
+ @Test
+ public void testClientWriter() throws Exception
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table2";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ String schema = "CREATE TABLE cql_keyspace.table2 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int"
+ + ")";
+
+ QueryProcessor.executeInternal(schema);
+
+ String insert = "INSERT INTO cql_keyspace.table2 (k, v1, v2) VALUES (?, ?, ?)";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .withPartitioner(StorageService.instance.getPartitioner())
+ .using(insert).build();
+
+ writer.addRow(0, "test1", 24);
+ writer.addRow(1, "test2", null);
+ writer.addRow(2, "test3", 42);
+ writer.addRow(ImmutableMap.<String, Object>of("k", 3, "v2", 12));
+ writer.close();
+
+ BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()),
+ DatabaseDescriptor.getRpcPort(),
+ null, null, new TFramedTransportFactory(),
+ DatabaseDescriptor.getStoragePort(),
+ DatabaseDescriptor.getSSLStoragePort(), null);
+
+
+ SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false));
+
+
+
+ loader.stream().get();
+
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table2;");
+ assertEquals(4, rs.size());
+ }
+
+
+ @Test
+ public void testClientWriterWithDroppedColumn() throws Exception
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table3";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ String schemaToDrop = "CREATE TABLE cql_keyspace.table3 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int,"
+ + " v3 list<int>,"
+ + " v4 text"
+ + ")";
+
+ QueryProcessor.executeInternal(schemaToDrop);
+ QueryProcessor.executeInternal("ALTER TABLE cql_keyspace.table3 DROP v4");
+
+
+ String schema = "CREATE TABLE cql_keyspace.table3 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int,"
+ + " v3 list<int>"
+ + ")";
+
+
+ String insert = "INSERT INTO cql_keyspace.table3 (k, v1, v2, v3) VALUES (?, ?, ?, ?)";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .withPartitioner(StorageService.instance.getPartitioner())
+ .using(insert).build();
+
+ writer.addRow(0, "test1", 24, Lists.newArrayList(4));
+ writer.addRow(1, "test2", null, Lists.newArrayList(4,4,5));
+ writer.addRow(2, "test3", 42, null);
+ writer.close();
+
+ BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()),
+ DatabaseDescriptor.getRpcPort(),
+ null, null, new TFramedTransportFactory(),
+ DatabaseDescriptor.getStoragePort(),
+ DatabaseDescriptor.getSSLStoragePort(), null);
+
+ SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false));
+
+
+ loader.stream().get();
+
+
+ CFMetaData cfMetaData = client.getCFMetaData(KS, TABLE);
+ assert cfMetaData != null;
+
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table3;");
+ assertEquals(3, rs.size());
+ }
+
+}