You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/11 20:04:00 UTC

[05/14] git commit: Give CRR a default input_cql Statement

Give CRR a default input_cql Statement

Patch by Mike Adamson, reviewed by brandonwilliams for CASSANDRA-7226


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52df514d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52df514d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52df514d

Branch: refs/heads/cassandra-2.1.0
Commit: 52df514dd1a95d4fc4d699d6ffa9d3bf7e844854
Parents: bd0bb6d
Author: Brandon Williams <br...@apache.org>
Authored: Mon Aug 11 13:00:39 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Aug 11 13:02:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  | 138 ++++++++++++++++++-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |  15 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   4 +-
 5 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd51e04..ddf4627 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Give CRR a default input_cql Statement (CASSANDRA-7226)
  * Better error message when adding a collection with the same name
    than a previously dropped one (CASSANDRA-6276)
  * Fix validation when adding static columns (CASSANDRA-7730)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 7a5fd47..b2c8fbf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -60,7 +60,7 @@ import com.google.common.collect.Sets;
 
 public class CqlConfigHelper
 {
-    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon ,
+    private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns";
     private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size";
     private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause";
     private static final String INPUT_CQL = "cassandra.input.cql";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 88c5c33..74310cf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -24,9 +24,17 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +71,8 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class);
 
+    public static final int DEFAULT_CQL_PAGE_LIMIT = 1000;
+
     private ColumnFamilySplit split;
     private RowIterator rowIterator;
 
@@ -74,6 +84,12 @@ public class CqlRecordReader extends RecordReader<Long, Row>
     private Cluster cluster;
     private Session session;
     private IPartitioner partitioner;
+    private String inputColumns;
+    private String userDefinedWhereClauses;
+    private int pageRowSize;
+
+    private List<String> partitionKeys = new ArrayList<>();
+    private List<String> clusteringKeys = new ArrayList<>();
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
@@ -92,8 +108,18 @@ public class CqlRecordReader extends RecordReader<Long, Row>
                       : ConfigHelper.getInputSplitSize(conf);
         cfName = quote(ConfigHelper.getInputColumnFamily(conf));
         keyspace = quote(ConfigHelper.getInputKeyspace(conf));
-        cqlQuery = CqlConfigHelper.getInputCql(conf);
-        partitioner = ConfigHelper.getInputPartitioner(HadoopCompat.getConfiguration(context));
+        partitioner = ConfigHelper.getInputPartitioner(conf);
+        inputColumns = CqlConfigHelper.getInputcolumns(conf);
+        userDefinedWhereClauses = CqlConfigHelper.getInputWhereClauses(conf);
+        Optional<Integer> pageRowSizeOptional = CqlConfigHelper.getInputPageRowSize(conf);
+        try
+        {
+            pageRowSize = pageRowSizeOptional.isPresent() ? pageRowSizeOptional.get() : DEFAULT_CQL_PAGE_LIMIT;
+        }
+        catch(NumberFormatException e)
+        {
+            pageRowSize = DEFAULT_CQL_PAGE_LIMIT;
+        }
         try
         {
             if (cluster != null)
@@ -125,6 +151,20 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         if (cluster != null)
             session = cluster.connect(keyspace);
+
+        if (session == null)
+          throw new RuntimeException("Can't create connection session");
+
+        // If the user provides a CQL query then we will use it without validation
+        // otherwise we will fall back to building a query using the:
+        //   inputColumns
+        //   whereClauses
+        //   pageRowSize
+        cqlQuery = CqlConfigHelper.getInputCql(conf);
+        if (StringUtils.isEmpty(cqlQuery))
+            cqlQuery = buildQuery();
+        logger.debug("cqlQuery {}", cqlQuery);
+
         rowIterator = new RowIterator();
         logger.debug("created {}", rowIterator);
     }
@@ -224,9 +264,6 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
         public RowIterator()
         {
-            if (session == null)
-                throw new RuntimeException("Can't create connection session");
-
             AbstractType type = partitioner.getTokenValidator();
             ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) );
             for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey())
@@ -487,6 +524,97 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
     }
 
+    /**
+     * Build a query for the reader of the form:
+     *
+     * SELECT * FROM ks>cf token(pk1,...pkn)>? AND token(pk1,...pkn)<=? [AND user where clauses]
+     * LIMIT pageRowSize [ALLOW FILTERING]
+     */
+    private String buildQuery()
+    {
+        fetchKeys();
+
+        String selectColumnList = makeColumnList(getSelectColumns());
+        String partitionKeyList = makeColumnList(partitionKeys);
+
+        return String.format("SELECT %s FROM %s.%s WHERE token(%s)>? AND token(%s)<=?" + getAdditionalWhereClauses(),
+                             selectColumnList, keyspace, cfName, partitionKeyList, partitionKeyList);
+    }
+
+    private String getAdditionalWhereClauses()
+    {
+        String whereClause = "";
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " AND " + userDefinedWhereClauses;
+        whereClause += " LIMIT " + pageRowSize;
+        if (StringUtils.isNotEmpty(userDefinedWhereClauses))
+            whereClause += " ALLOW FILTERING";
+        return whereClause;
+    }
+
+    private List<String> getSelectColumns()
+    {
+        List<String> selectColumns = new ArrayList<>();
+
+        if (StringUtils.isEmpty(inputColumns))
+            selectColumns.add("*");
+        else
+        {
+            // We must select all the partition keys plus any other columns the user wants
+            selectColumns.addAll(partitionKeys);
+            for (String column : Splitter.on(',').split(inputColumns))
+            {
+                if (!partitionKeys.contains(column))
+                    selectColumns.add(column);
+            }
+        }
+        return selectColumns;
+    }
+
+    private String makeColumnList(Collection<String> columns)
+    {
+        return Joiner.on(',').join(Iterables.transform(columns, new Function<String, String>()
+        {
+            public String apply(String column)
+            {
+                return quote(column);
+            }
+        }));
+    }
+
+    private void fetchKeys()
+    {
+        String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" +
+                       keyspace + "' and columnfamily_name='" + cfName + "'";
+        List<Row> rows = session.execute(query).all();
+        if (CollectionUtils.isEmpty(rows))
+        {
+            throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
+        }
+        int numberOfPartitionKeys = 0;
+        for (Row row : rows)
+            if (row.getString(2).equals("partition_key"))
+                numberOfPartitionKeys++;
+        String[] partitionKeyArray = new String[numberOfPartitionKeys];
+        for (Row row : rows)
+        {
+            String type = row.getString(2);
+            String column = row.getString(0);
+            if (type.equals("partition_key"))
+            {
+                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
+                partitionKeyArray[componentIndex] = column;
+            }
+            else if (type.equals("clustering_key"))
+            {
+                clusteringKeys.add(column);
+            }
+        }
+        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
+    }
+
+
+
     private String quote(String identifier)
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 1e48bf4..eea5d4e 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -177,7 +177,13 @@ public class CqlNativeStorage extends CqlStorage
         setConnectionInformation();
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (inputCql != null)
+            CqlConfigHelper.setInputCql(conf, inputCql);
+        if (columns != null)
+            CqlConfigHelper.setInputColumns(conf, columns);
+        if (whereClause != null)
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -270,6 +276,10 @@ public class CqlNativeStorage extends CqlStorage
                     nativeSSLCipherSuites = urlQuery.get("cipher_suites");
                 if (urlQuery.containsKey("input_cql"))
                     inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
                 if (urlQuery.containsKey("rpc_port"))
                     rpcPort = urlQuery.get("rpc_port");
             }
@@ -299,7 +309,8 @@ public class CqlNativeStorage extends CqlStorage
                     "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
-                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
+                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52df514d/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 02a6d98..53f3900 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -59,9 +59,9 @@ public class CqlStorage extends AbstractCassandraStorage
     protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
     protected int pageSize = 1000;
-    private String columns;
+    protected String columns;
     protected String outputQuery;
-    private String whereClause;
+    protected String whereClause;
     private boolean hasCompactValueAlias = false;
         
     public CqlStorage()