You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/11 20:03:56 UTC

[01/14] git commit: Fix validation when adding static columns

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 bd0bb6df4 -> 52df514dd
  refs/heads/cassandra-2.1 6eb76a212 -> 0c6078a70
  refs/heads/cassandra-2.1.0 ce96a2a12 -> 1d744b5d4
  refs/heads/trunk c7e191ba1 -> 755d345fe


Fix validation when adding static columns

patch by slebresne; reviewed by iamaleksey for CASSANDRA-7730


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73b02d67
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73b02d67
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73b02d67

Branch: refs/heads/trunk
Commit: 73b02d67c1fcea9a2f773251a0a525ec51b7477a
Parents: 2692c29
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 17:44:14 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 17:45:21 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                 | 1 +
 .../cassandra/cql3/statements/AlterTableStatement.java      | 9 +++++++--
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73b02d67/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6976db..4a3e086 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Fix validation when adding static columns (CASSANDRA-7730)
  * (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
  * Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)
  * Validate arguments of blobAs* functions (CASSANDRA-7707)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73b02d67/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 85b3547..88f0de8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -89,8 +89,13 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 if (cfDef.isCompact)
                     throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
 
-                if (isStatic && !cfDef.isComposite)
-                    throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+                if (isStatic)
+                {
+                    if (!cfDef.isComposite)
+                        throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+                    if (cfDef.clusteringColumns().isEmpty())
+                        throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+                }
 
                 if (name != null)
                 {


[03/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce96a2a1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce96a2a1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce96a2a1

Branch: refs/heads/trunk
Commit: ce96a2a120508ba0a31cc35e44905eae808f4be3
Parents: 4c387e4 bd0bb6d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:23:43 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:23:43 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../cql3/statements/AlterTableStatement.java    | 23 +++++-
 .../apache/cassandra/cql3/AlterTableTest.java   | 86 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 910feb4,cd51e04..64b4d65
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,7 +1,14 @@@
 -2.0.10
 +2.1.0-rc6
 + * Include stress yaml example in release and deb (CASSANDRA-7717)
 + * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
 + * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
 + * Fix binding null values inside UDT (CASSANDRA-7685)
 + * Fix UDT field selection with empty fields (CASSANDRA-7670)
 + * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
 +Merged from 2.0:
+  * Better error message when adding a collection with the same name
+    than a previously dropped one (CASSANDRA-6276)
+  * Fix validation when adding static columns (CASSANDRA-7730)
   * (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
   * Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)
   * Validate arguments of blobAs* functions (CASSANDRA-7707)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 273ee11,136c430..be28943
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@@ -85,16 -86,23 +85,23 @@@ public class AlterTableStatement extend
          switch (oType)
          {
              case ADD:
 -                if (cfDef.isCompact)
 +                if (cfm.comparator.isDense())
                      throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
-                 if (isStatic && !cfm.comparator.isCompound())
-                     throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+ 
+                 if (isStatic)
+                 {
 -                    if (!cfDef.isComposite)
++                    if (!cfm.comparator.isCompound())
+                         throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
 -                    if (cfDef.clusteringColumns().isEmpty())
++                    if (cfm.clusteringColumns().isEmpty())
+                         throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+                 }
+ 
 -                if (name != null)
 +                if (def != null)
                  {
 -                    switch (name.kind)
 +                    switch (def.kind)
                      {
 -                        case KEY_ALIAS:
 -                        case COLUMN_ALIAS:
 +                        case PARTITION_KEY:
 +                        case CLUSTERING_COLUMN:
                              throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
                          default:
                              throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
@@@ -104,18 -112,39 +111,30 @@@
                  AbstractType<?> type = validator.getType();
                  if (type instanceof CollectionType)
                  {
 -                    if (!cfDef.isComposite)
 +                    if (!cfm.comparator.supportCollections())
                          throw new InvalidRequestException("Cannot use collection types with non-composite PRIMARY KEY");
 -                    if (cfDef.cfm.isSuper())
 +                    if (cfm.isSuper())
                          throw new InvalidRequestException("Cannot use collection types with Super column family");
  
 -                    Map<ByteBuffer, CollectionType> collections = cfDef.hasCollections
 -                                                                ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
 -                                                                : new HashMap<ByteBuffer, CollectionType>();
+ 
+                     // If there used to be a collection column with the same name (that has been dropped), it will
+                     // still be appear in the ColumnToCollectionType because or reasons explained on #6276. The same
+                     // reason mean that we can't allow adding a new collection with that name (see the ticket for details).
 -                    CollectionType previous = collections.get(columnName.key);
 -                    if (previous != null && !type.isCompatibleWith(previous))
 -                        throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
 -                                    "because a collection with the same name and a different type has already been used in the past", columnName));
++                    if (cfm.comparator.hasCollections())
++                    {
++                        CollectionType previous = cfm.comparator.collectionType() == null ? null : cfm.comparator.collectionType().defined.get(columnName.bytes);
++                        if (previous != null && !type.isCompatibleWith(previous))
++                            throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
++                                        "because a collection with the same name and a different type has already been used in the past", columnName));
++                    }
+ 
 -                    collections.put(columnName.key, (CollectionType)type);
 -                    ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
 -                    List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
 -                    if (cfDef.hasCollections)
 -                        ctypes.set(ctypes.size() - 1, newColType);
 -                    else
 -                        ctypes.add(newColType);
 -                    cfm.comparator = CompositeType.getInstance(ctypes);
 +                    cfm.comparator = cfm.comparator.addOrUpdateCollection(columnName, (CollectionType)type);
                  }
  
 -                Integer componentIndex = cfDef.isComposite
 -                                       ? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
 -                                       : null;
 +                Integer componentIndex = cfm.comparator.isCompound() ? cfm.comparator.clusteringPrefixSize() : null;
                  cfm.addColumnDefinition(isStatic
 -                                        ? ColumnDefinition.staticDef(columnName.key, type, componentIndex)
 -                                        : ColumnDefinition.regularDef(columnName.key, type, componentIndex));
 +                                        ? ColumnDefinition.staticDef(cfm, columnName.bytes, type, componentIndex)
 +                                        : ColumnDefinition.regularDef(cfm, columnName.bytes, type, componentIndex));
                  break;
  
              case ALTER:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce96a2a1/test/unit/org/apache/cassandra/cql3/AlterTableTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/AlterTableTest.java
index 0000000,0000000..f5747ed
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/AlterTableTest.java
@@@ -1,0 -1,0 +1,86 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.cassandra.cql3;
++
++import org.junit.Test;
++
++import org.apache.cassandra.exceptions.InvalidRequestException;
++
++public class AlterTableTest extends CQLTester
++{
++    @Test
++    public void testAddList() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text);");
++        execute("ALTER TABLE %s ADD myCollection list<text>;");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test", list("first element")));
++    }
++
++    @Test
++    public void testDropList() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++        execute("ALTER TABLE %s DROP myCollection;");
++
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test"));
++    }
++    @Test
++    public void testAddMap() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text);");
++        execute("ALTER TABLE %s ADD myCollection map<text, text>;");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', { '1' : 'first element'});");
++
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test", map("1", "first element")));
++    }
++
++    @Test
++    public void testDropMap() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection map<text, text>);");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', { '1' : 'first element'});");
++        execute("ALTER TABLE %s DROP myCollection;");
++
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test"));
++    }
++
++    @Test
++    public void testDropListAndAddListWithSameName() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++        execute("ALTER TABLE %s DROP myCollection;");
++        execute("ALTER TABLE %s ADD myCollection list<text>;");
++
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test", null));
++        execute("UPDATE %s set myCollection = ['second element'] WHERE id = 'test';");
++        assertRows(execute("SELECT * FROM %s;"), row("test", "first test", list("second element")));
++    }
++    @Test
++    public void testDropListAndAddMapWithSameName() throws Throwable
++    {
++        createTable("CREATE TABLE %s (id text PRIMARY KEY, content text, myCollection list<text>);");
++        execute("INSERT INTO %s (id, content , myCollection) VALUES ('test', 'first test', ['first element']);");
++        execute("ALTER TABLE %s DROP myCollection;");
++
++        assertInvalid("ALTER TABLE %s ADD myCollection map<int, int>;");
++    }
++}


[02/14] git commit: Better error message when trying to add a collection with the same name than a previously dropped one

Posted by br...@apache.org.
Better error message when trying to add a collection with the same name than a previously dropped one


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd0bb6df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd0bb6df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd0bb6df

Branch: refs/heads/trunk
Commit: bd0bb6df4613588967ab0c67c268a231c112b321
Parents: 73b02d6
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:07:35 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:07:35 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                  | 2 ++
 .../cassandra/cql3/statements/AlterTableStatement.java       | 8 ++++++++
 2 files changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd0bb6df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a3e086..cd51e04 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.10
+ * Better error message when adding a collection with the same name
+   than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)
  * (Thrift) fix range deletion of supercolumns (CASSANDRA-7733)
  * Fix potential AssertionError in RangeTombstoneList (CASSANDRA-7700)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd0bb6df/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index 88f0de8..136c430 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -121,6 +121,14 @@ public class AlterTableStatement extends SchemaAlteringStatement
                                                                 ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
                                                                 : new HashMap<ByteBuffer, CollectionType>();
 
+                    // If there used to be a collection column with the same name (that has been dropped), it will
+                    // still be appear in the ColumnToCollectionType because or reasons explained on #6276. The same
+                    // reason mean that we can't allow adding a new collection with that name (see the ticket for details).
+                    CollectionType previous = collections.get(columnName.key);
+                    if (previous != null && !type.isCompatibleWith(previous))
+                        throw new InvalidRequestException(String.format("Cannot add a collection with the name %s " +
+                                    "because a collection with the same name and a different type has already been used in the past", columnName));
+
                     collections.put(columnName.key, (CollectionType)type);
                     ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
                     List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);


[13/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c6078a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c6078a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c6078a7

Branch: refs/heads/cassandra-2.1
Commit: 0c6078a70cfa83f851c1d2b9ec039e2f00161bfc
Parents: 6eb76a2 1d744b5
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:32 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[04/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6eb76a21
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6eb76a21
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6eb76a21

Branch: refs/heads/trunk
Commit: 6eb76a2122c276bd252196b9ffc18764dfae4e3e
Parents: 50ee3a7 ce96a2a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Aug 11 18:24:03 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Aug 11 18:24:03 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../cql3/statements/AlterTableStatement.java    | 23 +++++-
 .../apache/cassandra/cql3/AlterTableTest.java   | 86 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6eb76a21/CHANGES.txt
----------------------------------------------------------------------


[14/14] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/755d345f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/755d345f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/755d345f

Branch: refs/heads/trunk
Commit: 755d345fefafa1384e26269e6381e5b517216a42
Parents: c7e191b 0c6078a
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:47 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:47 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../cql3/statements/AlterTableStatement.java    |  23 +++-
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 .../apache/cassandra/cql3/AlterTableTest.java   |  86 ++++++++++++
 7 files changed, 260 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/755d345f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[07/14] git commit: Give CRR a default input_cql Statement

Posted by br...@apache.org.
Give CRR a default input_cql Statement

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d

Branch: refs/heads/cassandra-2.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
  * Better error message when adding a collection with the same name
    than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
 
 public class CqlConfigHelper
 {
-    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
     private static final String INPUT_CQL = "cassandra.input.cql";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
 
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
     private ColumnFamilySplit split;
     private RowIterator rowIterator;
 
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private Cluster cluster;
     private Session session;
     private IPartitioner partitioner;
+    private String inputColumns;
+    private String userDefinedWhereClauses;
+    private int pageRowSize;
+
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                       : ConfigHelper.getInputSplitSize(conf);
         cfName = quote(ConfigHelper.getInputColumnFamily(conf));
         keyspace = quote(ConfigHelper.getInputKeyspace(conf));
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        inputColumns = CqlConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+        Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+        try
+        {
+            pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+        }
+        catch(NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
         try
         {
             if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         if (cluster != null)
             session = cluster.connect(keyspace);
+
+        if (session == null)
+          throw new RuntimeException("Can't create connection session");
+
+        // If the user provides a CQL query then we will use it without validation
+        // otherwise we will fall back to building a query using the:
+        //   inputColumns
+        //   whereClauses
+        //   pageRowSize
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        if (StringUtils.isEmpty(cqlQuery))
+            cqlQuery = buildQuery();
+        logger.debug("cqlQuery {}", cqlQuery);
+
         rowIterator = new RowIterator();
         logger.debug("created {}", rowIterator);
     }
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         public RowIterator()
         {
-            if (session == null)
-                throw new RuntimeException("Can't create connection session");
-
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
             for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
     }
 
+    /**
+     * Build a query for the reader of the form:
+     *
+     * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+     * LIMIT pageRowSize [ALLOW FILTERING]
+     */
+    private String buildQuery()
+    {
+        fetchKeys();
+
+        String selectColumnList = makeColumnList(getSelectColumns());
+        String partitionKeyList = makeColumnList(partitionKeys);
+
+        return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+                             selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+    }
+
+    private String getAdditionalWhereClauses()
+    {
+        String whereClause = "";
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " AND " + userDefinedWhereClauses;
+        whereClause += " LIMIT " + pageRowSize;
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " ALLOW FILTERING";
+        return whereClause;
+    }
+
+    private List<String> getSelectColumns()
+    {
+        List<String> selectColumns = new ArrayList<>();
+
+        if (StringUtils.isEmpty(inputColumns))
+            selectColumns.add("*");
+        else
+        {
+            // We must select all the partition keys plus any other columns the user wants
+            selectColumns.addAll(partitionKeys);
+            for (String column : Splitter.on(',').split(inputColumns))
+            {
+                if (!partitionKeys.contains(column))
+                    selectColumns.add(column);
+            }
+        }
+        return selectColumns;
+    }
+
+    private String makeColumnList(Collection<String> columns)
+    {
+        return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+        {
+            public String apply(String column)
+            {
+                return quote(column);
+            }
+        }));
+    }
+
+    private void fetchKeys()
+    {
+        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        List<Row> rows = session.execute(query).all();
+        if (CollectionUtils.isEmpty(rows))
+        {
+            throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+        }
+        int numberOfPartitionKeys = 0;
+        for (Row row : rows)
+            if (row.getString(2).equals("partition_key"))
+                numberOfPartitionKeys++;
+        String[] partitionKeyArray = new String[numberOfPartitionKeys];
+        for (Row row : rows)
+        {
+            String type = row.getString(2);
+            String column = row.getString(0);
+            if (type.equals("partition_key"))
+            {
+                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+                partitionKeyArray[componentIndex] = column;
+            }
+            else if (type.equals("clustering_key"))
+            {
+                clusteringKeys.add(column);
+            }
+        }
+        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+    }
+
+
+
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
         setConnectionInformation();
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (inputCql != null)
+            CqlConfigHelper.setInputCql(conf, inputCql);
+        if (columns != null)
+            CqlConfigHelper.setInputColumns(conf, columns);
+        if (whereClause != null)
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
                     nativeSSLCipherSuites = urlQuery.get("cipher_suites");
                 if (urlQuery.containsKey("input_cql"))
                     inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
                 if (urlQuery.containsKey("rpc_port"))
                     rpcPort = urlQuery.get("rpc_port");
             }
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
                     "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
     protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
     protected int pageSize = 1000;
-    private String columns;
+    protected String columns;
     protected String outputQuery;
-    private String whereClause;
+    protected String whereClause;
     private boolean hasCompactValueAlias = false;
         
     public CqlStorage()


[08/14] git commit: Give CRR a default input_cql Statement

Posted by br...@apache.org.
Give CRR a default input_cql Statement

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d

Branch: refs/heads/cassandra-2.1
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
  * Better error message when adding a collection with the same name
    than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
 
 public class CqlConfigHelper
 {
-    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
     private static final String INPUT_CQL = "cassandra.input.cql";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
 
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
     private ColumnFamilySplit split;
     private RowIterator rowIterator;
 
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private Cluster cluster;
     private Session session;
     private IPartitioner partitioner;
+    private String inputColumns;
+    private String userDefinedWhereClauses;
+    private int pageRowSize;
+
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                       : ConfigHelper.getInputSplitSize(conf);
         cfName = quote(ConfigHelper.getInputColumnFamily(conf));
         keyspace = quote(ConfigHelper.getInputKeyspace(conf));
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        inputColumns = CqlConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+        Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+        try
+        {
+            pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+        }
+        catch(NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
         try
         {
             if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         if (cluster != null)
             session = cluster.connect(keyspace);
+
+        if (session == null)
+          throw new RuntimeException("Can't create connection session");
+
+        // If the user provides a CQL query then we will use it without validation
+        // otherwise we will fall back to building a query using the:
+        //   inputColumns
+        //   whereClauses
+        //   pageRowSize
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        if (StringUtils.isEmpty(cqlQuery))
+            cqlQuery = buildQuery();
+        logger.debug("cqlQuery {}", cqlQuery);
+
         rowIterator = new RowIterator();
         logger.debug("created {}", rowIterator);
     }
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         public RowIterator()
         {
-            if (session == null)
-                throw new RuntimeException("Can't create connection session");
-
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
             for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
     }
 
+    /**
+     * Build a query for the reader of the form:
+     *
+     * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+     * LIMIT pageRowSize [ALLOW FILTERING]
+     */
+    private String buildQuery()
+    {
+        fetchKeys();
+
+        String selectColumnList = makeColumnList(getSelectColumns());
+        String partitionKeyList = makeColumnList(partitionKeys);
+
+        return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+                             selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+    }
+
+    private String getAdditionalWhereClauses()
+    {
+        String whereClause = "";
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " AND " + userDefinedWhereClauses;
+        whereClause += " LIMIT " + pageRowSize;
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " ALLOW FILTERING";
+        return whereClause;
+    }
+
+    private List<String> getSelectColumns()
+    {
+        List<String> selectColumns = new ArrayList<>();
+
+        if (StringUtils.isEmpty(inputColumns))
+            selectColumns.add("*");
+        else
+        {
+            // We must select all the partition keys plus any other columns the user wants
+            selectColumns.addAll(partitionKeys);
+            for (String column : Splitter.on(',').split(inputColumns))
+            {
+                if (!partitionKeys.contains(column))
+                    selectColumns.add(column);
+            }
+        }
+        return selectColumns;
+    }
+
+    private String makeColumnList(Collection<String> columns)
+    {
+        return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+        {
+            public String apply(String column)
+            {
+                return quote(column);
+            }
+        }));
+    }
+
+    private void fetchKeys()
+    {
+        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        List<Row> rows = session.execute(query).all();
+        if (CollectionUtils.isEmpty(rows))
+        {
+            throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+        }
+        int numberOfPartitionKeys = 0;
+        for (Row row : rows)
+            if (row.getString(2).equals("partition_key"))
+                numberOfPartitionKeys++;
+        String[] partitionKeyArray = new String[numberOfPartitionKeys];
+        for (Row row : rows)
+        {
+            String type = row.getString(2);
+            String column = row.getString(0);
+            if (type.equals("partition_key"))
+            {
+                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+                partitionKeyArray[componentIndex] = column;
+            }
+            else if (type.equals("clustering_key"))
+            {
+                clusteringKeys.add(column);
+            }
+        }
+        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+    }
+
+
+
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
         setConnectionInformation();
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (inputCql != null)
+            CqlConfigHelper.setInputCql(conf, inputCql);
+        if (columns != null)
+            CqlConfigHelper.setInputColumns(conf, columns);
+        if (whereClause != null)
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
                     nativeSSLCipherSuites = urlQuery.get("cipher_suites");
                 if (urlQuery.containsKey("input_cql"))
                     inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
                 if (urlQuery.containsKey("rpc_port"))
                     rpcPort = urlQuery.get("rpc_port");
             }
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
                     "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
     protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
     protected int pageSize = 1000;
-    private String columns;
+    protected String columns;
     protected String outputQuery;
-    private String whereClause;
+    protected String whereClause;
     private boolean hasCompactValueAlias = false;
         
     public CqlStorage()


[09/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d

Branch: refs/heads/cassandra-2.1
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.0.10
 +2.1.0-rc6
 + * Include stress yaml example in release and deb (CASSANDRA-7717)
 + * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
 + * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
 + * Fix binding null values inside UDT (CASSANDRA-7685)
 + * Fix UDT field selection with empty fields (CASSANDRA-7670)
 + * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
 +Merged from 2.0:
+  * Give CRR a default input_cql Statement (CASSANDRA-7226)
   * Better error message when adding a collection with the same name
     than a previously dropped one (CASSANDRA-6276)
   * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[06/14] git commit: Give CRR a default input_cql Statement

Posted by br...@apache.org.
Give CRR a default input_cql Statement

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d

Branch: refs/heads/trunk
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
  * Better error message when adding a collection with the same name
    than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
 
 public class CqlConfigHelper
 {
-    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
     private static final String INPUT_CQL = "cassandra.input.cql";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
 
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
     private ColumnFamilySplit split;
     private RowIterator rowIterator;
 
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private Cluster cluster;
     private Session session;
     private IPartitioner partitioner;
+    private String inputColumns;
+    private String userDefinedWhereClauses;
+    private int pageRowSize;
+
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                       : ConfigHelper.getInputSplitSize(conf);
         cfName = quote(ConfigHelper.getInputColumnFamily(conf));
         keyspace = quote(ConfigHelper.getInputKeyspace(conf));
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        inputColumns = CqlConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+        Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+        try
+        {
+            pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+        }
+        catch(NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
         try
         {
             if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         if (cluster != null)
             session = cluster.connect(keyspace);
+
+        if (session == null)
+          throw new RuntimeException("Can't create connection session");
+
+        // If the user provides a CQL query then we will use it without validation
+        // otherwise we will fall back to building a query using the:
+        //   inputColumns
+        //   whereClauses
+        //   pageRowSize
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        if (StringUtils.isEmpty(cqlQuery))
+            cqlQuery = buildQuery();
+        logger.debug("cqlQuery {}", cqlQuery);
+
         rowIterator = new RowIterator();
         logger.debug("created {}", rowIterator);
     }
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         public RowIterator()
         {
-            if (session == null)
-                throw new RuntimeException("Can't create connection session");
-
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
             for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
     }
 
+    /**
+     * Build a query for the reader of the form:
+     *
+     * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+     * LIMIT pageRowSize [ALLOW FILTERING]
+     */
+    private String buildQuery()
+    {
+        fetchKeys();
+
+        String selectColumnList = makeColumnList(getSelectColumns());
+        String partitionKeyList = makeColumnList(partitionKeys);
+
+        return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+                             selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+    }
+
+    private String getAdditionalWhereClauses()
+    {
+        String whereClause = "";
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " AND " + userDefinedWhereClauses;
+        whereClause += " LIMIT " + pageRowSize;
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " ALLOW FILTERING";
+        return whereClause;
+    }
+
+    private List<String> getSelectColumns()
+    {
+        List<String> selectColumns = new ArrayList<>();
+
+        if (StringUtils.isEmpty(inputColumns))
+            selectColumns.add("*");
+        else
+        {
+            // We must select all the partition keys plus any other columns the user wants
+            selectColumns.addAll(partitionKeys);
+            for (String column : Splitter.on(',').split(inputColumns))
+            {
+                if (!partitionKeys.contains(column))
+                    selectColumns.add(column);
+            }
+        }
+        return selectColumns;
+    }
+
+    private String makeColumnList(Collection<String> columns)
+    {
+        return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+        {
+            public String apply(String column)
+            {
+                return quote(column);
+            }
+        }));
+    }
+
+    private void fetchKeys()
+    {
+        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        List<Row> rows = session.execute(query).all();
+        if (CollectionUtils.isEmpty(rows))
+        {
+            throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+        }
+        int numberOfPartitionKeys = 0;
+        for (Row row : rows)
+            if (row.getString(2).equals("partition_key"))
+                numberOfPartitionKeys++;
+        String[] partitionKeyArray = new String[numberOfPartitionKeys];
+        for (Row row : rows)
+        {
+            String type = row.getString(2);
+            String column = row.getString(0);
+            if (type.equals("partition_key"))
+            {
+                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+                partitionKeyArray[componentIndex] = column;
+            }
+            else if (type.equals("clustering_key"))
+            {
+                clusteringKeys.add(column);
+            }
+        }
+        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+    }
+
+
+
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
         setConnectionInformation();
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (inputCql != null)
+            CqlConfigHelper.setInputCql(conf, inputCql);
+        if (columns != null)
+            CqlConfigHelper.setInputColumns(conf, columns);
+        if (whereClause != null)
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
                     nativeSSLCipherSuites = urlQuery.get("cipher_suites");
                 if (urlQuery.containsKey("input_cql"))
                     inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
                 if (urlQuery.containsKey("rpc_port"))
                     rpcPort = urlQuery.get("rpc_port");
             }
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
                     "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
     protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
     protected int pageSize = 1000;
-    private String columns;
+    protected String columns;
     protected String outputQuery;
-    private String whereClause;
+    protected String whereClause;
     private boolean hasCompactValueAlias = false;
         
     public CqlStorage()


[11/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d

Branch: refs/heads/trunk
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.0.10
 +2.1.0-rc6
 + * Include stress yaml example in release and deb (CASSANDRA-7717)
 + * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
 + * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
 + * Fix binding null values inside UDT (CASSANDRA-7685)
 + * Fix UDT field selection with empty fields (CASSANDRA-7670)
 + * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
 +Merged from 2.0:
+  * Give CRR a default input_cql Statement (CASSANDRA-7226)
   * Better error message when adding a collection with the same name
     than a previously dropped one (CASSANDRA-6276)
   * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[05/14] git commit: Give CRR a default input_cql Statement

Posted by br...@apache.org.
Give CRR a default input_cql Statement

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d

Branch: refs/heads/cassandra-2.1.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
  * Better error message when adding a collection with the same name
    than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
 
 public class CqlConfigHelper
 {
-    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
     private static final String INPUT_CQL = "cassandra.input.cql";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
 
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
     private ColumnFamilySplit split;
     private RowIterator rowIterator;
 
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private Cluster cluster;
     private Session session;
     private IPartitioner partitioner;
+    private String inputColumns;
+    private String userDefinedWhereClauses;
+    private int pageRowSize;
+
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                       : ConfigHelper.getInputSplitSize(conf);
         cfName = quote(ConfigHelper.getInputColumnFamily(conf));
         keyspace = quote(ConfigHelper.getInputKeyspace(conf));
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        inputColumns = CqlConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+        Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+        try
+        {
+            pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+        }
+        catch(NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
         try
         {
             if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         if (cluster != null)
             session = cluster.connect(keyspace);
+
+        if (session == null)
+          throw new RuntimeException("Can't create connection session");
+
+        // If the user provides a CQL query then we will use it without validation
+        // otherwise we will fall back to building a query using the:
+        //   inputColumns
+        //   whereClauses
+        //   pageRowSize
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        if (StringUtils.isEmpty(cqlQuery))
+            cqlQuery = buildQuery();
+        logger.debug("cqlQuery {}", cqlQuery);
+
         rowIterator = new RowIterator();
         logger.debug("created {}", rowIterator);
     }
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         public RowIterator()
         {
-            if (session == null)
-                throw new RuntimeException("Can't create connection session");
-
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
             for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
     }
 
+    /**
+     * Build a query for the reader of the form:
+     *
+     * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+     * LIMIT pageRowSize [ALLOW FILTERING]
+     */
+    private String buildQuery()
+    {
+        fetchKeys();
+
+        String selectColumnList = makeColumnList(getSelectColumns());
+        String partitionKeyList = makeColumnList(partitionKeys);
+
+        return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+                             selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+    }
+
+    private String getAdditionalWhereClauses()
+    {
+        String whereClause = "";
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " AND " + userDefinedWhereClauses;
+        whereClause += " LIMIT " + pageRowSize;
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " ALLOW FILTERING";
+        return whereClause;
+    }
+
+    private List<String> getSelectColumns()
+    {
+        List<String> selectColumns = new ArrayList<>();
+
+        if (StringUtils.isEmpty(inputColumns))
+            selectColumns.add("*");
+        else
+        {
+            // We must select all the partition keys plus any other columns the user wants
+            selectColumns.addAll(partitionKeys);
+            for (String column : Splitter.on(',').split(inputColumns))
+            {
+                if (!partitionKeys.contains(column))
+                    selectColumns.add(column);
+            }
+        }
+        return selectColumns;
+    }
+
+    private String makeColumnList(Collection<String> columns)
+    {
+        return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+        {
+            public String apply(String column)
+            {
+                return quote(column);
+            }
+        }));
+    }
+
+    private void fetchKeys()
+    {
+        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        List<Row> rows = session.execute(query).all();
+        if (CollectionUtils.isEmpty(rows))
+        {
+            throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+        }
+        int numberOfPartitionKeys = 0;
+        for (Row row : rows)
+            if (row.getString(2).equals("partition_key"))
+                numberOfPartitionKeys++;
+        String[] partitionKeyArray = new String[numberOfPartitionKeys];
+        for (Row row : rows)
+        {
+            String type = row.getString(2);
+            String column = row.getString(0);
+            if (type.equals("partition_key"))
+            {
+                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+                partitionKeyArray[componentIndex] = column;
+            }
+            else if (type.equals("clustering_key"))
+            {
+                clusteringKeys.add(column);
+            }
+        }
+        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+    }
+
+
+
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
         setConnectionInformation();
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (inputCql != null)
+            CqlConfigHelper.setInputCql(conf, inputCql);
+        if (columns != null)
+            CqlConfigHelper.setInputColumns(conf, columns);
+        if (whereClause != null)
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
                     nativeSSLCipherSuites = urlQuery.get("cipher_suites");
                 if (urlQuery.containsKey("input_cql"))
                     inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
                 if (urlQuery.containsKey("rpc_port"))
                     rpcPort = urlQuery.get("rpc_port");
             }
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
                     "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
     protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
     protected int pageSize = 1000;
-    private String columns;
+    protected String columns;
     protected String outputQuery;
-    private String whereClause;
+    protected String whereClause;
     private boolean hasCompactValueAlias = false;
         
     public CqlStorage()


[12/14] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by br...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0c6078a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0c6078a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0c6078a7

Branch: refs/heads/trunk
Commit: 0c6078a70cfa83f851c1d2b9ec039e2f00161bfc
Parents: 6eb76a2 1d744b5
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:32 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:32 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0c6078a7/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------


[10/14] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1.0

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d744b5d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d744b5d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d744b5d

Branch: refs/heads/cassandra-2.1.0
Commit: 1d744b5d4d2c18c3f43fd8a1cdd33b333186d290
Parents: ce96a2a 52df514d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:03:13 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:03:13 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 64b4d65,ddf4627..f34b912
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.0.10
 +2.1.0-rc6
 + * Include stress yaml example in release and deb (CASSANDRA-7717)
 + * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)
 + * cqlsh DESC CLUSTER fails retrieving ring information (CASSANDRA-7687)
 + * Fix binding null values inside UDT (CASSANDRA-7685)
 + * Fix UDT field selection with empty fields (CASSANDRA-7670)
 + * Bogus deserialization of static cells from sstable (CASSANDRA-7684)
 +Merged from 2.0:
+  * Give CRR a default input_cql Statement (CASSANDRA-7226)
   * Better error message when adding a collection with the same name
     than a previously dropped one (CASSANDRA-6276)
   * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d744b5d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------