You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/11 20:03:56 UTC
[01/14] git commit: Fix validation when adding static columns
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 bd0bb6df4 -> 52df514dd
refs/heads/cassandra-2.1 6eb76a212 -> 0c6078a70
refs/heads/cassandra-2.1.0 ce96a2a12 -> 1d744b5d4
refs/heads/trunk c7e191ba1 -> 755d345fe
Fix validation when adding static columns
patch by slebresne; reviewed by iamaleksey for CASSANDRA-7730
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73b02d67
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73b02d67
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73b02d67
Branch: refs/heads/trunk
Commit: 73b02d67c1fcea9a2f773251a0a525ec51b7477a
Parents: 2692c29
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 17:44:14 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 17:45:21 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/cql3/statements/AlterTableStatement.java | 9 +++++++--
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73b02d67/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6976db..4a3e086 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Fix validation when adding static columns (CASSANDRA-7730)
* (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
* Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)
* Validate arguments of blobAs* functions (CASSANDRA-7707)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73b02d67/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 85b3547..88f0de8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -89,8 +89,13 @@ public class AlterTableStatement extends SchemaAlteringStatement
if (cfDef.isCompact)
throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
- if (isStatic && !cfDef.isComposite)
- throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+ if (isStatic)
+ {
+ if (!cfDef.isComposite)
+ throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+ if (cfDef.clusteringColumns().isEmpty())
+ throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+ }
if (name != null)
{
[03/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce96a2a1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce96a2a1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce96a2a1
Branch: refs/heads/trunk
Commit: ce96a2a120508ba0a31cc35e44905eae808f4be3
Parents: 4c387e4 bd0bb6d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:23:43 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:23:43 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../cql3/statements/AlterTableStatement.java | 23 +++++-
.../apache/cassandra/cql3/AlterTableTest.java | 86 ++++++++++++++++++++
3 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 910feb4,cd51e04..64b4d65
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,7 +1,14 @@@
-2.0.10
+2.1.0-rc6
+ * Include stress yaml example in release and deb (CASSANDRA-7717)
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
+ * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
+ * Fix binding null values inside UDT (CASSANDRA-7685)
+ * Fix UDT field selection with empty fields (CASSANDRA-7670)
+ * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
+Merged from 2.0:
+ * Better error message when adding a collection with the same name
+ than a previously dropped one (CASSANDRA-6276)
+ * Fix validation when adding static columns (CASSANDRA-7730)
* (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
* Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)
* Validate arguments of blobAs* functions (CASSANDRA-7707)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 273ee11,136c430..be28943
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@@ -85,16 -86,23 +85,23 @@@ public class AlterTableStatement extend
switch (oType)
{
case ADD:
- if (cfDef.isCompact)
+ if (cfm.comparator.isDense())
throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
- if (isStatic && !cfm.comparator.isCompound())
- throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+
+ if (isStatic)
+ {
- if (!cfDef.isComposite)
++ if (!cfm.comparator.isCompound())
+ throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
- if (cfDef.clusteringColumns().isEmpty())
++ if (cfm.clusteringColumns().isEmpty())
+ throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+ }
+
- if (name != null)
+ if (def != null)
{
- switch (name.kind)
+ switch (def.kind)
{
- case KEY_ALIAS:
- case COLUMN_ALIAS:
+ case PARTITION_KEY:
+ case CLUSTERING_COLUMN:
throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
default:
throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
@@@ -104,18 -112,39 +111,30 @@@
AbstractType<?> type = validator.getType();
if (type instanceof CollectionType)
{
- if (!cfDef.isComposite)
+ if (!cfm.comparator.supportCollections())
throw new InvalidRequestException("Cannot use collection types with non-composite PRIMARY KEY");
- if (cfDef.cfm.isSuper())
+ if (cfm.isSuper())
throw new InvalidRequestException("Cannot use collection types with Super column family");
- Map<ByteBuffer, CollectionType> collections = cfDef.hasCollections
- ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
- : new HashMap<ByteBuffer, CollectionType>();
+
+ // If there used to be a collection column with the same name (that has been dropped), it will
+ // still be appear in the ColumnToCollectionType because or reasons explained on #6276. The same
+ // reason mean that we can't allow adding a new collection with that name (see the ticket for details).
- CollectionType previous = collections.get(columnName.key);
- if (previous != null && !type.isCompatibleWith(previous))
- throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
- "because a collection with the same name and a different type has already been used in the past", columnName));
++ if (cfm.comparator.hasCollections())
++ {
++ CollectionType previous = cfm.comparator.collectionType() == null ? null : cfm.comparator.collectionType().defined.get(columnName.bytes);
++ if (previous != null && !type.isCompatibleWith(previous))
++ throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
++ "because a collection with the same name and a different type has already been used in the past", columnName));
++ }
+
- collections.put(columnName.key, (CollectionType)type);
- ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
- List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
- if (cfDef.hasCollections)
- ctypes.set(ctypes.size() - 1, newColType);
- else
- ctypes.add(newColType);
- cfm.comparator = CompositeType.getInstance(ctypes);
+ cfm.comparator = cfm.comparator.addOrUpdateCollection(columnName, (CollectionType)type);
}
- Integer componentIndex = cfDef.isComposite
- ? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
- : null;
+ Integer componentIndex = cfm.comparator.isCompound() ? cfm.comparator.clusteringPrefixSize() : null;
cfm.addColumnDefinition(isStatic
- ? ColumnDefinition.staticDef(columnName.key, type, componentIndex)
- : ColumnDefinition.regularDef(columnName.key, type, componentIndex));
+ ? ColumnDefinition.staticDef(cfm, columnName.bytes, type, componentIndex)
+ : ColumnDefinition.regularDef(cfm, columnName.bytes, type, componentIndex));
break;
case ALTER:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/test/unit/org/apache/cassandra/cql3/AlterTableTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/AlterTableTest.java
index 0000000,0000000..f5747ed
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/AlterTableTest.java
@@@ -1,0 -1,0 +1,86 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.cassandra.cql3;
++
++import org.junit.Test;
++
++import org.apache.cassandra.exceptions.InvalidRequestException;
++
++public class AlterTableTest extends CQLTester
++{
++ @Test
++ public void testAddList() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text);");
++ execute("ALTER TABLE %s ADD myCollection list<text>;");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test", list("first element")));
++ }
++
++ @Test
++ public void testDropList() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++ execute("ALTER TABLE %s DROP myCollection;");
++
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test"));
++ }
++ @Test
++ public void testAddMap() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text);");
++ execute("ALTER TABLE %s ADD myCollection map<text, text>;");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', { '1' : 'first element'});");
++
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test", map("1", "first element")));
++ }
++
++ @Test
++ public void testDropMap() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection map<text, text>);");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', { '1' : 'first element'});");
++ execute("ALTER TABLE %s DROP myCollection;");
++
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test"));
++ }
++
++ @Test
++ public void testDropListAndAddListWithSameName() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++ execute("ALTER TABLE %s DROP myCollection;");
++ execute("ALTER TABLE %s ADD myCollection list<text>;");
++
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test", null));
++ execute("UPDATE %s set myCollection = ['second element'] WHERE id = 'test';");
++ assertRows(execute("SELECT * FROM %s;"), row("test", "first test", list("second element")));
++ }
++ @Test
++ public void testDropListAndAddMapWithSameName() throws Throwable
++ {
++ createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++ execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++ execute("ALTER TABLE %s DROP myCollection;");
++
++ assertInvalid("ALTER TABLE %s ADD myCollection map<int, int>;");
++ }
++}
[02/14] git commit: Better error message when trying to add a
collection with the same name than a previously dropped one
Posted by br...@apache.org.
Better error message when trying to add a collection with the same name than a previously dropped one
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd0bb6df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd0bb6df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd0bb6df
Branch: refs/heads/trunk
Commit: bd0bb6df4613588967ab0c67c268a231c112b321
Parents: 73b02d6
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:07:35 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:07:35 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../cassandra/cql3/statements/AlterTableStatement.java | 8 ++++++++
2 files changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd0bb6df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a3e086..cd51e04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.10
+ * Better error message when adding a collection with the same name
+ than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
* (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
* Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd0bb6df/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 88f0de8..136c430 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -121,6 +121,14 @@ public class AlterTableStatement extends SchemaAlteringStatement
? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
: new HashMap<ByteBuffer, CollectionType>();
+ // If there used to be a collection column with the same name (that has been dropped), it will
+ // still be appear in the ColumnToCollectionType because or reasons explained on #6276. The same
+ // reason mean that we can't allow adding a new collection with that name (see the ticket for details).
+ CollectionType previous = collections.get(columnName.key);
+ if (previous != null && !type.isCompatibleWith(previous))
+ throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
+ "because a collection with the same name and a different type has already been used in the past", columnName));
+
collections.put(columnName.key, (CollectionType)type);
ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
[13/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c6078a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c6078a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c6078a7
Branch: refs/heads/cassandra-2.1
Commit: 0c6078a70cfa83f851c1d2b9ec039e2f00161bfc
Parents: 6eb76a2 1d744b5
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:32 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:32 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[04/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6eb76a21
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6eb76a21
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6eb76a21
Branch: refs/heads/trunk
Commit: 6eb76a2122c276bd252196b9ffc18764dfae4e3e
Parents: 50ee3a7 ce96a2a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:24:03 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:24:03 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../cql3/statements/AlterTableStatement.java | 23 +++++-
.../apache/cassandra/cql3/AlterTableTest.java | 86 ++++++++++++++++++++
3 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6eb76a21/CHANGES.txt
----------------------------------------------------------------------
[14/14] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/755d345f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/755d345f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/755d345f
Branch: refs/heads/trunk
Commit: 755d345fefafa1384e26269e6381e5b517216a42
Parents: c7e191b 0c6078a
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:47 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:47 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../cql3/statements/AlterTableStatement.java | 23 +++-
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
.../apache/cassandra/cql3/AlterTableTest.java | 86 ++++++++++++
7 files changed, 260 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[07/14] git commit: Give CRR a default input_cql Statement
Posted by br...@apache.org.
Give CRR a default input_cql Statement
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d
Branch: refs/heads/cassandra-2.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
public class CqlConfigHelper
{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
private static final String INPUT_CQL = "cassandra.input.cql";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
private ColumnFamilySplit split;
private RowIterator rowIterator;
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private Cluster cluster;
private Session session;
private IPartitioner partitioner;
+ private String inputColumns;
+ private String userDefinedWhereClauses;
+ private int pageRowSize;
+
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
: ConfigHelper.getInputSplitSize(conf);
cfName = quote(ConfigHelper.getInputColumnFamily(conf));
keyspace = quote(ConfigHelper.getInputKeyspace(conf));
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+ partitioner = ConfigHelper.getInputPartitioner(conf);
+ inputColumns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+ Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+ try
+ {
+ pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+ }
+ catch(NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
try
{
if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
session = cluster.connect(keyspace);
+
+ if (session == null)
+ throw new RuntimeException("Can't create connection session");
+
+ // If the user provides a CQL query then we will use it without validation
+ // otherwise we will fall back to building a query using the:
+ // inputColumns
+ // whereClauses
+ // pageRowSize
+ cqlQuery = CqlConfigHelper.getInputCql(conf);
+ if (StringUtils.isEmpty(cqlQuery))
+ cqlQuery = buildQuery();
+ logger.debug("cqlQuery {}", cqlQuery);
+
rowIterator = new RowIterator();
logger.debug("created {}", rowIterator);
}
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public RowIterator()
{
- if (session == null)
- throw new RuntimeException("Can't create connection session");
-
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
}
+ /**
+ * Build a query for the reader of the form:
+ *
+ * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+ * LIMIT pageRowSize [ALLOW FILTERING]
+ */
+ private String buildQuery()
+ {
+ fetchKeys();
+
+ String selectColumnList = makeColumnList(getSelectColumns());
+ String partitionKeyList = makeColumnList(partitionKeys);
+
+ return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+ selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+ }
+
+ private String getAdditionalWhereClauses()
+ {
+ String whereClause = "";
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " AND " + userDefinedWhereClauses;
+ whereClause += " LIMIT " + pageRowSize;
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " ALLOW FILTERING";
+ return whereClause;
+ }
+
+ private List<String> getSelectColumns()
+ {
+ List<String> selectColumns = new ArrayList<>();
+
+ if (StringUtils.isEmpty(inputColumns))
+ selectColumns.add("*");
+ else
+ {
+ // We must select all the partition keys plus any other columns the user wants
+ selectColumns.addAll(partitionKeys);
+ for (String column : Splitter.on(',').split(inputColumns))
+ {
+ if (!partitionKeys.contains(column))
+ selectColumns.add(column);
+ }
+ }
+ return selectColumns;
+ }
+
+ private String makeColumnList(Collection<String> columns)
+ {
+ return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+ {
+ public String apply(String column)
+ {
+ return quote(column);
+ }
+ }));
+ }
+
+ private void fetchKeys()
+ {
+ String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+ keyspace + "' and columnfamily_name='" + cfName + "'";
+ List<Row> rows = session.execute(query).all();
+ if (CollectionUtils.isEmpty(rows))
+ {
+ throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+ }
+ int numberOfPartitionKeys = 0;
+ for (Row row : rows)
+ if (row.getString(2).equals("partition_key"))
+ numberOfPartitionKeys++;
+ String[] partitionKeyArray = new String[numberOfPartitionKeys];
+ for (Row row : rows)
+ {
+ String type = row.getString(2);
+ String column = row.getString(0);
+ if (type.equals("partition_key"))
+ {
+ int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+ partitionKeyArray[componentIndex] = column;
+ }
+ else if (type.equals("clustering_key"))
+ {
+ clusteringKeys.add(column);
+ }
+ }
+ partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+ }
+
+
+
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
setConnectionInformation();
CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- CqlConfigHelper.setInputCql(conf, inputCql);
+ if (inputCql != null)
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (columns != null)
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null)
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
nativeSSLCipherSuites = urlQuery.get("cipher_suites");
if (urlQuery.containsKey("input_cql"))
inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
- "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+ "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
protected int pageSize = 1000;
- private String columns;
+ protected String columns;
protected String outputQuery;
- private String whereClause;
+ protected String whereClause;
private boolean hasCompactValueAlias = false;
public CqlStorage()
[08/14] git commit: Give CRR a default input_cql Statement
Posted by br...@apache.org.
Give CRR a default input_cql Statement
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d
Branch: refs/heads/cassandra-2.1
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
public class CqlConfigHelper
{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
private static final String INPUT_CQL = "cassandra.input.cql";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
private ColumnFamilySplit split;
private RowIterator rowIterator;
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private Cluster cluster;
private Session session;
private IPartitioner partitioner;
+ private String inputColumns;
+ private String userDefinedWhereClauses;
+ private int pageRowSize;
+
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
: ConfigHelper.getInputSplitSize(conf);
cfName = quote(ConfigHelper.getInputColumnFamily(conf));
keyspace = quote(ConfigHelper.getInputKeyspace(conf));
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+ partitioner = ConfigHelper.getInputPartitioner(conf);
+ inputColumns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+ Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+ try
+ {
+ pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+ }
+ catch(NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
try
{
if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
session = cluster.connect(keyspace);
+
+ if (session == null)
+ throw new RuntimeException("Can't create connection session");
+
+ // If the user provides a CQL query then we will use it without validation
+ // otherwise we will fall back to building a query using the:
+ // inputColumns
+ // whereClauses
+ // pageRowSize
+ cqlQuery = CqlConfigHelper.getInputCql(conf);
+ if (StringUtils.isEmpty(cqlQuery))
+ cqlQuery = buildQuery();
+ logger.debug("cqlQuery {}", cqlQuery);
+
rowIterator = new RowIterator();
logger.debug("created {}", rowIterator);
}
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public RowIterator()
{
- if (session == null)
- throw new RuntimeException("Can't create connection session");
-
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
}
+ /**
+ * Build a query for the reader of the form:
+ *
+ * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+ * LIMIT pageRowSize [ALLOW FILTERING]
+ */
+ private String buildQuery()
+ {
+ fetchKeys();
+
+ String selectColumnList = makeColumnList(getSelectColumns());
+ String partitionKeyList = makeColumnList(partitionKeys);
+
+ return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+ selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+ }
+
+ private String getAdditionalWhereClauses()
+ {
+ String whereClause = "";
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " AND " + userDefinedWhereClauses;
+ whereClause += " LIMIT " + pageRowSize;
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " ALLOW FILTERING";
+ return whereClause;
+ }
+
+ private List<String> getSelectColumns()
+ {
+ List<String> selectColumns = new ArrayList<>();
+
+ if (StringUtils.isEmpty(inputColumns))
+ selectColumns.add("*");
+ else
+ {
+ // We must select all the partition keys plus any other columns the user wants
+ selectColumns.addAll(partitionKeys);
+ for (String column : Splitter.on(',').split(inputColumns))
+ {
+ if (!partitionKeys.contains(column))
+ selectColumns.add(column);
+ }
+ }
+ return selectColumns;
+ }
+
+ private String makeColumnList(Collection<String> columns)
+ {
+ return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+ {
+ public String apply(String column)
+ {
+ return quote(column);
+ }
+ }));
+ }
+
+ private void fetchKeys()
+ {
+ String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+ keyspace + "' and columnfamily_name='" + cfName + "'";
+ List<Row> rows = session.execute(query).all();
+ if (CollectionUtils.isEmpty(rows))
+ {
+ throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+ }
+ int numberOfPartitionKeys = 0;
+ for (Row row : rows)
+ if (row.getString(2).equals("partition_key"))
+ numberOfPartitionKeys++;
+ String[] partitionKeyArray = new String[numberOfPartitionKeys];
+ for (Row row : rows)
+ {
+ String type = row.getString(2);
+ String column = row.getString(0);
+ if (type.equals("partition_key"))
+ {
+ int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+ partitionKeyArray[componentIndex] = column;
+ }
+ else if (type.equals("clustering_key"))
+ {
+ clusteringKeys.add(column);
+ }
+ }
+ partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+ }
+
+
+
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
setConnectionInformation();
CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- CqlConfigHelper.setInputCql(conf, inputCql);
+ if (inputCql != null)
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (columns != null)
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null)
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
nativeSSLCipherSuites = urlQuery.get("cipher_suites");
if (urlQuery.containsKey("input_cql"))
inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
- "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+ "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
protected int pageSize = 1000;
- private String columns;
+ protected String columns;
protected String outputQuery;
- private String whereClause;
+ protected String whereClause;
private boolean hasCompactValueAlias = false;
public CqlStorage()
[09/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d
Branch: refs/heads/cassandra-2.1
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.0.10
+2.1.0-rc6
+ * Include stress yaml example in release and deb (CASSANDRA-7717)
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
+ * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
+ * Fix binding null values inside UDT (CASSANDRA-7685)
+ * Fix UDT field selection with empty fields (CASSANDRA-7670)
+ * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
+Merged from 2.0:
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[06/14] git commit: Give CRR a default input_cql Statement
Posted by br...@apache.org.
Give CRR a default input_cql Statement
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d
Branch: refs/heads/trunk
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
public class CqlConfigHelper
{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
private static final String INPUT_CQL = "cassandra.input.cql";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
private ColumnFamilySplit split;
private RowIterator rowIterator;
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private Cluster cluster;
private Session session;
private IPartitioner partitioner;
+ private String inputColumns;
+ private String userDefinedWhereClauses;
+ private int pageRowSize;
+
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
: ConfigHelper.getInputSplitSize(conf);
cfName = quote(ConfigHelper.getInputColumnFamily(conf));
keyspace = quote(ConfigHelper.getInputKeyspace(conf));
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+ partitioner = ConfigHelper.getInputPartitioner(conf);
+ inputColumns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+ Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+ try
+ {
+ pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+ }
+ catch(NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
try
{
if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
session = cluster.connect(keyspace);
+
+ if (session == null)
+ throw new RuntimeException("Can't create connection session");
+
+ // If the user provides a CQL query then we will use it without validation
+ // otherwise we will fall back to building a query using the:
+ // inputColumns
+ // whereClauses
+ // pageRowSize
+ cqlQuery = CqlConfigHelper.getInputCql(conf);
+ if (StringUtils.isEmpty(cqlQuery))
+ cqlQuery = buildQuery();
+ logger.debug("cqlQuery {}", cqlQuery);
+
rowIterator = new RowIterator();
logger.debug("created {}", rowIterator);
}
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public RowIterator()
{
- if (session == null)
- throw new RuntimeException("Can't create connection session");
-
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
}
+ /**
+ * Build a query for the reader of the form:
+ *
+ * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+ * LIMIT pageRowSize [ALLOW FILTERING]
+ */
+ private String buildQuery()
+ {
+ fetchKeys();
+
+ String selectColumnList = makeColumnList(getSelectColumns());
+ String partitionKeyList = makeColumnList(partitionKeys);
+
+ return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+ selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+ }
+
+ private String getAdditionalWhereClauses()
+ {
+ String whereClause = "";
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " AND " + userDefinedWhereClauses;
+ whereClause += " LIMIT " + pageRowSize;
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " ALLOW FILTERING";
+ return whereClause;
+ }
+
+ private List<String> getSelectColumns()
+ {
+ List<String> selectColumns = new ArrayList<>();
+
+ if (StringUtils.isEmpty(inputColumns))
+ selectColumns.add("*");
+ else
+ {
+ // We must select all the partition keys plus any other columns the user wants
+ selectColumns.addAll(partitionKeys);
+ for (String column : Splitter.on(',').split(inputColumns))
+ {
+ if (!partitionKeys.contains(column))
+ selectColumns.add(column);
+ }
+ }
+ return selectColumns;
+ }
+
+ private String makeColumnList(Collection<String> columns)
+ {
+ return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+ {
+ public String apply(String column)
+ {
+ return quote(column);
+ }
+ }));
+ }
+
+ private void fetchKeys()
+ {
+ String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+ keyspace + "' and columnfamily_name='" + cfName + "'";
+ List<Row> rows = session.execute(query).all();
+ if (CollectionUtils.isEmpty(rows))
+ {
+ throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+ }
+ int numberOfPartitionKeys = 0;
+ for (Row row : rows)
+ if (row.getString(2).equals("partition_key"))
+ numberOfPartitionKeys++;
+ String[] partitionKeyArray = new String[numberOfPartitionKeys];
+ for (Row row : rows)
+ {
+ String type = row.getString(2);
+ String column = row.getString(0);
+ if (type.equals("partition_key"))
+ {
+ int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+ partitionKeyArray[componentIndex] = column;
+ }
+ else if (type.equals("clustering_key"))
+ {
+ clusteringKeys.add(column);
+ }
+ }
+ partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+ }
+
+
+
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
setConnectionInformation();
CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- CqlConfigHelper.setInputCql(conf, inputCql);
+ if (inputCql != null)
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (columns != null)
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null)
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
nativeSSLCipherSuites = urlQuery.get("cipher_suites");
if (urlQuery.containsKey("input_cql"))
inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
- "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+ "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
protected int pageSize = 1000;
- private String columns;
+ protected String columns;
protected String outputQuery;
- private String whereClause;
+ protected String whereClause;
private boolean hasCompactValueAlias = false;
public CqlStorage()
[11/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d
Branch: refs/heads/trunk
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.0.10
+2.1.0-rc6
+ * Include stress yaml example in release and deb (CASSANDRA-7717)
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
+ * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
+ * Fix binding null values inside UDT (CASSANDRA-7685)
+ * Fix UDT field selection with empty fields (CASSANDRA-7670)
+ * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
+Merged from 2.0:
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[05/14] git commit: Give CRR a default input_cql Statement
Posted by br...@apache.org.
Give CRR a default input_cql Statement
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d
Branch: refs/heads/cassandra-2.1.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
public class CqlConfigHelper
{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
private static final String INPUT_CQL = "cassandra.input.cql";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
private ColumnFamilySplit split;
private RowIterator rowIterator;
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private Cluster cluster;
private Session session;
private IPartitioner partitioner;
+ private String inputColumns;
+ private String userDefinedWhereClauses;
+ private int pageRowSize;
+
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
: ConfigHelper.getInputSplitSize(conf);
cfName = quote(ConfigHelper.getInputColumnFamily(conf));
keyspace = quote(ConfigHelper.getInputKeyspace(conf));
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+ partitioner = ConfigHelper.getInputPartitioner(conf);
+ inputColumns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+ Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+ try
+ {
+ pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+ }
+ catch(NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
try
{
if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
session = cluster.connect(keyspace);
+
+ if (session == null)
+ throw new RuntimeException("Can't create connection session");
+
+ // If the user provides a CQL query then we will use it without validation
+ // otherwise we will fall back to building a query using the:
+ // inputColumns
+ // whereClauses
+ // pageRowSize
+ cqlQuery = CqlConfigHelper.getInputCql(conf);
+ if (StringUtils.isEmpty(cqlQuery))
+ cqlQuery = buildQuery();
+ logger.debug("cqlQuery {}", cqlQuery);
+
rowIterator = new RowIterator();
logger.debug("created {}", rowIterator);
}
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public RowIterator()
{
- if (session == null)
- throw new RuntimeException("Can't create connection session");
-
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
}
+ /**
+ * Build a query for the reader of the form:
+ *
+ * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+ * LIMIT pageRowSize [ALLOW FILTERING]
+ */
+ private String buildQuery()
+ {
+ fetchKeys();
+
+ String selectColumnList = makeColumnList(getSelectColumns());
+ String partitionKeyList = makeColumnList(partitionKeys);
+
+ return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+ selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+ }
+
+ private String getAdditionalWhereClauses()
+ {
+ String whereClause = "";
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " AND " + userDefinedWhereClauses;
+ whereClause += " LIMIT " + pageRowSize;
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " ALLOW FILTERING";
+ return whereClause;
+ }
+
+ private List<String> getSelectColumns()
+ {
+ List<String> selectColumns = new ArrayList<>();
+
+ if (StringUtils.isEmpty(inputColumns))
+ selectColumns.add("*");
+ else
+ {
+ // We must select all the partition keys plus any other columns the user wants
+ selectColumns.addAll(partitionKeys);
+ for (String column : Splitter.on(',').split(inputColumns))
+ {
+ if (!partitionKeys.contains(column))
+ selectColumns.add(column);
+ }
+ }
+ return selectColumns;
+ }
+
+ private String makeColumnList(Collection<String> columns)
+ {
+ return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+ {
+ public String apply(String column)
+ {
+ return quote(column);
+ }
+ }));
+ }
+
+ private void fetchKeys()
+ {
+ String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+ keyspace + "' and columnfamily_name='" + cfName + "'";
+ List<Row> rows = session.execute(query).all();
+ if (CollectionUtils.isEmpty(rows))
+ {
+ throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+ }
+ int numberOfPartitionKeys = 0;
+ for (Row row : rows)
+ if (row.getString(2).equals("partition_key"))
+ numberOfPartitionKeys++;
+ String[] partitionKeyArray = new String[numberOfPartitionKeys];
+ for (Row row : rows)
+ {
+ String type = row.getString(2);
+ String column = row.getString(0);
+ if (type.equals("partition_key"))
+ {
+ int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+ partitionKeyArray[componentIndex] = column;
+ }
+ else if (type.equals("clustering_key"))
+ {
+ clusteringKeys.add(column);
+ }
+ }
+ partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+ }
+
+
+
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
setConnectionInformation();
CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- CqlConfigHelper.setInputCql(conf, inputCql);
+ if (inputCql != null)
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (columns != null)
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null)
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
nativeSSLCipherSuites = urlQuery.get("cipher_suites");
if (urlQuery.containsKey("input_cql"))
inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
- "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+ "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
protected int pageSize = 1000;
- private String columns;
+ protected String columns;
protected String outputQuery;
- private String whereClause;
+ protected String whereClause;
private boolean hasCompactValueAlias = false;
public CqlStorage()
[12/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c6078a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c6078a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c6078a7
Branch: refs/heads/trunk
Commit: 0c6078a70cfa83f851c1d2b9ec039e2f00161bfc
Parents: 6eb76a2 1d744b5
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:32 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:32 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[10/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d
Branch: refs/heads/cassandra-2.1.0
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.0.10
+2.1.0-rc6
+ * Include stress yaml example in release and deb (CASSANDRA-7717)
+ * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
+ * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
+ * Fix binding null values inside UDT (CASSANDRA-7685)
+ * Fix UDT field selection with empty fields (CASSANDRA-7670)
+ * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
+Merged from 2.0:
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------