You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/12 20:04:06 UTC
[03/10] git commit: Fix CRR, add pig test for it
Fix CRR, add pig test for it
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7726
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7049ee0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7049ee0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7049ee0e
Branch: refs/heads/cassandra-2.1
Commit: 7049ee0e2bdb37a0dc82fa849462ffd375a20e85
Parents: f7e8803
Author: Brandon Williams <br...@apache.org>
Authored: Tue Aug 12 12:33:46 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Aug 12 12:34:14 2014 -0500
----------------------------------------------------------------------
.../cassandra/hadoop/cql3/CqlRecordReader.java | 28 ++++++++++----------
1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7049ee0e/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 74310cf..fa8dec9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,7 +89,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private int pageRowSize;
private List<String> partitionKeys = new ArrayList<>();
- private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -106,8 +105,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
totalRowCount = (this.split.getLength() < Long.MAX_VALUE)
? (int) this.split.getLength()
: ConfigHelper.getInputSplitSize(conf);
- cfName = quote(ConfigHelper.getInputColumnFamily(conf));
- keyspace = quote(ConfigHelper.getInputKeyspace(conf));
+ cfName = ConfigHelper.getInputColumnFamily(conf);
+ keyspace = ConfigHelper.getInputKeyspace(conf);
partitioner = ConfigHelper.getInputPartitioner(conf);
inputColumns = CqlConfigHelper.getInputcolumns(conf);
userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
@@ -161,6 +160,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
// whereClauses
// pageRowSize
cqlQuery = CqlConfigHelper.getInputCql(conf);
+ // validate that the user hasn't tried to give us a custom query along with input columns
+ // and where clauses
+ if (StringUtils.isNotEmpty(cqlQuery) && (StringUtils.isNotEmpty(inputColumns) ||
+ StringUtils.isNotEmpty(userDefinedWhereClauses)))
+ {
+ throw new AssertionError("Cannot define a custom query with input columns and / or where clauses");
+ }
+
if (StringUtils.isEmpty(cqlQuery))
cqlQuery = buildQuery();
logger.debug("cqlQuery {}", cqlQuery);
@@ -266,7 +273,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
- for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
+ for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(quote(keyspace)).getTable(quote(cfName)).getPartitionKey())
partitionBoundColumns.put(meta.getName(), Boolean.TRUE);
rows = rs.iterator();
}
@@ -534,7 +541,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
fetchKeys();
- String selectColumnList = makeColumnList(getSelectColumns());
+ List<String> columns = getSelectColumns();
+ String selectColumnList = columns.size() == 0 ? "*" : makeColumnList(columns);
String partitionKeyList = makeColumnList(partitionKeys);
return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
@@ -556,9 +564,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
List<String> selectColumns = new ArrayList<>();
- if (StringUtils.isEmpty(inputColumns))
- selectColumns.add("*");
- else
+ if (StringUtils.isNotEmpty(inputColumns))
{
// We must select all the partition keys plus any other columns the user wants
selectColumns.addAll(partitionKeys);
@@ -605,16 +611,10 @@ public class CqlRecordReader extends RecordReader<Long, Row>
int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
partitionKeyArray[componentIndex] = column;
}
- else if (type.equals("clustering_key"))
- {
- clusteringKeys.add(column);
- }
}
partitionKeys.addAll(Arrays.asList(partitionKeyArray));
}
-
-
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";