You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/11 20:04:00 UTC
[05/14] git commit: Give CRR a default input_cql Statement
Give CRR a default input_cql Statement
Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d
Branch: refs/heads/cassandra-2.1.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +-
.../cassandra/hadoop/cql3/CqlRecordReader.java | 138 ++++++++++++++++++-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 15 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 4 +-
5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
* Better error message when adding a collection with the same name
than a previously dropped one (CASSANDRA-6276)
* Fix validation when adding static columns (CASSANDRA-7730)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
public class CqlConfigHelper
{
- private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+ private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
private static final String INPUT_CQL = "cassandra.input.cql";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.hadoop.HadoopCompat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
{
private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
+ public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
private ColumnFamilySplit split;
private RowIterator rowIterator;
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
private Cluster cluster;
private Session session;
private IPartitioner partitioner;
+ private String inputColumns;
+ private String userDefinedWhereClauses;
+ private int pageRowSize;
+
+ private List<String> partitionKeys = new ArrayList<>();
+ private List<String> clusteringKeys = new ArrayList<>();
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
: ConfigHelper.getInputSplitSize(conf);
cfName = quote(ConfigHelper.getInputColumnFamily(conf));
keyspace = quote(ConfigHelper.getInputKeyspace(conf));
- cqlQuery = CqlConfigHelper.getInputCql(conf);
- partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+ partitioner = ConfigHelper.getInputPartitioner(conf);
+ inputColumns = CqlConfigHelper.getInputcolumns(conf);
+ userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+ Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+ try
+ {
+ pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+ }
+ catch(NumberFormatException e)
+ {
+ pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+ }
try
{
if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (cluster != null)
session = cluster.connect(keyspace);
+
+ if (session == null)
+ throw new RuntimeException("Can't create connection session");
+
+ // If the user provides a CQL query then we will use it without validation
+ // otherwise we will fall back to building a query using the:
+ // inputColumns
+ // whereClauses
+ // pageRowSize
+ cqlQuery = CqlConfigHelper.getInputCql(conf);
+ if (StringUtils.isEmpty(cqlQuery))
+ cqlQuery = buildQuery();
+ logger.debug("cqlQuery {}", cqlQuery);
+
rowIterator = new RowIterator();
logger.debug("created {}", rowIterator);
}
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
public RowIterator()
{
- if (session == null)
- throw new RuntimeException("Can't create connection session");
-
AbstractType type = partitioner.getTokenValidator();
ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
}
}
+ /**
+ * Build a query for the reader of the form:
+ *
+ * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+ * LIMIT pageRowSize [ALLOW FILTERING]
+ */
+ private String buildQuery()
+ {
+ fetchKeys();
+
+ String selectColumnList = makeColumnList(getSelectColumns());
+ String partitionKeyList = makeColumnList(partitionKeys);
+
+ return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+ selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+ }
+
+ private String getAdditionalWhereClauses()
+ {
+ String whereClause = "";
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " AND " + userDefinedWhereClauses;
+ whereClause += " LIMIT " + pageRowSize;
+ if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+ whereClause += " ALLOW FILTERING";
+ return whereClause;
+ }
+
+ private List<String> getSelectColumns()
+ {
+ List<String> selectColumns = new ArrayList<>();
+
+ if (StringUtils.isEmpty(inputColumns))
+ selectColumns.add("*");
+ else
+ {
+ // We must select all the partition keys plus any other columns the user wants
+ selectColumns.addAll(partitionKeys);
+ for (String column : Splitter.on(',').split(inputColumns))
+ {
+ if (!partitionKeys.contains(column))
+ selectColumns.add(column);
+ }
+ }
+ return selectColumns;
+ }
+
+ private String makeColumnList(Collection<String> columns)
+ {
+ return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+ {
+ public String apply(String column)
+ {
+ return quote(column);
+ }
+ }));
+ }
+
+ private void fetchKeys()
+ {
+ String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+ keyspace + "' and columnfamily_name='" + cfName + "'";
+ List<Row> rows = session.execute(query).all();
+ if (CollectionUtils.isEmpty(rows))
+ {
+ throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+ }
+ int numberOfPartitionKeys = 0;
+ for (Row row : rows)
+ if (row.getString(2).equals("partition_key"))
+ numberOfPartitionKeys++;
+ String[] partitionKeyArray = new String[numberOfPartitionKeys];
+ for (Row row : rows)
+ {
+ String type = row.getString(2);
+ String column = row.getString(0);
+ if (type.equals("partition_key"))
+ {
+ int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+ partitionKeyArray[componentIndex] = column;
+ }
+ else if (type.equals("clustering_key"))
+ {
+ clusteringKeys.add(column);
+ }
+ }
+ partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+ }
+
+
+
private String quote(String identifier)
{
return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
setConnectionInformation();
CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
- CqlConfigHelper.setInputCql(conf, inputCql);
+ if (inputCql != null)
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (columns != null)
+ CqlConfigHelper.setInputColumns(conf, columns);
+ if (whereClause != null)
+ CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
{
try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
nativeSSLCipherSuites = urlQuery.get("cipher_suites");
if (urlQuery.containsKey("input_cql"))
inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("columns"))
+ columns = urlQuery.get("columns");
+ if (urlQuery.containsKey("where_clause"))
+ whereClause = urlQuery.get("where_clause");
if (urlQuery.containsKey("rpc_port"))
rpcPort = urlQuery.get("rpc_port");
}
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
- "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+ "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
protected int pageSize = 1000;
- private String columns;
+ protected String columns;
protected String outputQuery;
- private String whereClause;
+ protected String whereClause;
private boolean hasCompactValueAlias = false;
public CqlStorage()