You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/12 20:04:06 UTC

[03/10] git commit: Fix CRR, add pig test for it

Fix CRR, add pig test for it

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7726


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7049ee0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7049ee0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7049ee0e

Branch: refs/heads/cassandra-2.1
Commit: 7049ee0e2bdb37a0dc82fa849462ffd375a20e85
Parents: f7e8803
Author: Brandon Williams <br...@apache.org>
Authored: Tue Aug 12 12:33:46 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Aug 12 12:34:14 2014 -0500

----------------------------------------------------------------------
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 28 ++++++++++----------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7049ee0e/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 74310cf..fa8dec9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,7 +89,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private int pageRowSize;
 
     private List<String> partitionKeys = new ArrayList<>();
-    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -106,8 +105,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
                       ? (int) this.split.getLength()
                       : ConfigHelper.getInputSplitSize(conf);
-        cfName = quote(ConfigHelper.getInputColumnFamily(conf));
-        keyspace = quote(ConfigHelper.getInputKeyspace(conf));
+        cfName = ConfigHelper.getInputColumnFamily(conf);
+        keyspace = ConfigHelper.getInputKeyspace(conf);
         partitioner = ConfigHelper.getInputPartitioner(conf);
         inputColumns = CqlConfigHelper.getInputcolumns(conf);
         userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
@@ -161,6 +160,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         //   whereClauses
         //   pageRowSize
         cqlQuery = CqlConfigHelper.getInputCql(conf);
+        // validate that the user hasn't tried to give us a custom query along with input columns
+        // and where clauses
+        if (StringUtils.isNotEmpty(cqlQuery) && (StringUtils.isNotEmpty(inputColumns) ||
+                                                 StringUtils.isNotEmpty(userDefinedWhereClauses)))
+        {
+            throw new AssertionError("Cannot define a custom query with input columns and / or where clauses");
+        }
+
         if (StringUtils.isEmpty(cqlQuery))
             cqlQuery = buildQuery();
         logger.debug("cqlQuery {}", cqlQuery);
@@ -266,7 +273,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         {
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
-            for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
+            for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(quote(keyspace)).getTable(quote(cfName)).getPartitionKey())
                 partitionBoundColumns.put(meta.getName(), Boolean.TRUE);
             rows = rs.iterator();
         }
@@ -534,7 +541,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     {
         fetchKeys();
 
-        String selectColumnList = makeColumnList(getSelectColumns());
+        List<String> columns = getSelectColumns();
+        String selectColumnList = columns.size() == 0 ? "*" : makeColumnList(columns);
         String partitionKeyList = makeColumnList(partitionKeys);
 
         return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
@@ -556,9 +564,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     {
         List<String> selectColumns = new ArrayList<>();
 
-        if (StringUtils.isEmpty(inputColumns))
-            selectColumns.add("*");
-        else
+        if (StringUtils.isNotEmpty(inputColumns))
         {
             // We must select all the partition keys plus any other columns the user wants
             selectColumns.addAll(partitionKeys);
@@ -605,16 +611,10 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                 int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
                 partitionKeyArray[componentIndex] = column;
             }
-            else if (type.equals("clustering_key"))
-            {
-                clusteringKeys.add(column);
-            }
         }
         partitionKeys.addAll(Arrays.asList(partitionKeyArray));
     }
 
-
-
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";