You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/26 20:54:35 UTC

[3/6] git commit: Don't add extraneous field with CqlStorage Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071

Don't add extraneous field with CqlStorage
Patch by Sam Tunnicliffe, reviewed by Alex Liu for CASSANDRA-6071


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/389ff55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/389ff55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/389ff55e

Branch: refs/heads/trunk
Commit: 389ff55e2bbc3046a6ad1aba85bdaab0e38dc6e8
Parents: 00e871d
Author: Brandon Williams <br...@apache.org>
Authored: Thu Sep 26 13:49:07 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Sep 26 13:49:07 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 151 ++-----------------
 .../cassandra/hadoop/pig/CassandraStorage.java  |   2 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 144 +++++++++++++++++-
 3 files changed, 153 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 50671da..ce92014 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -641,7 +641,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             NotFoundException;
 
     /** get column meta data */
-    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage)
+    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
@@ -666,9 +666,13 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
         List<CqlRow> rows = result.rows;
         List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
-        if (!cassandraStorage && (rows == null || rows.isEmpty()))
+        if (rows == null || rows.isEmpty())
         {
-            // check classic thrift tables
+            // if CassandraStorage, just return the empty list
+            if (cassandraStorage)
+                return columnDefs;
+
+            // otherwise for CqlStorage, check metadata for classic thrift tables
             CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
             for (ColumnIdentifier column : cfDefinition.metadata.keySet())
             {
@@ -680,7 +684,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                 cDef.validation_class = type;
                 columnDefs.add(cDef);
             }
-            if (columnDefs.size() == 0)
+            // we may not need to include the value column for compact tables as we 
+            // could have already processed it as schema_columnfamilies.value_alias
+            if (columnDefs.size() == 0 && includeCompactValueColumn)
             {
                 String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
                 if ("value".equals(value))
@@ -693,8 +699,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             }
             return columnDefs;
         }
-        else if (rows == null || rows.isEmpty())
-            return columnDefs;
 
         Iterator<CqlRow> iterator = rows.iterator();
         while (iterator.hasNext())
@@ -711,138 +715,6 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return columnDefs;
     }
 
-    /** get keys meta data */
-    protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
-            throws Exception
-    {
-        String query = "SELECT key_aliases, " +
-                       "       column_aliases, " +
-                       "       key_validator, " +
-                       "       comparator, " +
-                       "       keyspace_name, " +
-                       "       value_alias, " +
-                       "       default_validator " +
-                       "FROM system.schema_columnfamilies " +
-                       "WHERE keyspace_name = '%s'" +
-                       "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                                            ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                                            Compression.NONE,
-                                            ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        List<ColumnDef> keys = new ArrayList<ColumnDef>();
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-            logger.debug("Found ksDef name: {}", name);
-            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
-            logger.debug("partition keys: {}", keyString);
-            List<String> keyNames = FBUtilities.fromJsonList(keyString);
- 
-            Iterator<String> iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-            // classic thrift tables
-            if (keys.size() == 0)
-            {
-                CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-                for (ColumnIdentifier column : cfDefinition.keys.keySet())
-                {
-                    String key = column.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-                for (ColumnIdentifier column : cfDefinition.columns.keySet())
-                {
-                    String key = column.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-            }
-
-            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
-            logger.debug("cluster keys: {}", keyString);
-            keyNames = FBUtilities.fromJsonList(keyString);
-
-            iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-
-            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: {}", validator);
-            AbstractType<?> keyValidator = parseType(validator);
-
-            Iterator<ColumnDef> keyItera = keys.iterator();
-            if (keyValidator instanceof CompositeType)
-            {
-                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
-                while (typeItera.hasNext())
-                    keyItera.next().validation_class = typeItera.next().toString();
-            }
-            else
-                keyItera.next().validation_class = keyValidator.toString();
-
-            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: {}", validator);
-
-            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
-            {
-                AbstractType<?> clusterKeyValidator = parseType(validator);
-
-                if (clusterKeyValidator instanceof CompositeType)
-                {
-                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
-                    while (keyItera.hasNext())
-                        keyItera.next().validation_class = typeItera.next().toString();
-                }
-                else
-                    keyItera.next().validation_class = clusterKeyValidator.toString();
-            }
-
-            // compact value_alias column
-            if (cqlRow.columns.get(5).value != null)
-            {
-                try
-                {
-                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: {}", compactValidator);
-                    AbstractType<?> defaultValidator = parseType(compactValidator);
-
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = cqlRow.columns.get(5).value;
-                    cDef.validation_class = defaultValidator.toString();
-                    keys.add(cDef);
-                }
-                catch (Exception e)
-                {
-                    // no compact column at value_alias
-                }
-            }
-
-        }
-        return keys;
-    }
-
     /** get index type from string */
     protected IndexType getIndexType(String type)
     {
@@ -884,9 +756,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return indexes;
     }
 
-
     /** get CFDefinition of a column family */
-    private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+    protected CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
             throws NotFoundException,
             InvalidRequestException,
             TException,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 8cf06f2..09171a0 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -711,7 +711,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         if (cql3Table)
             return new ArrayList<ColumnDef>();
         
-        return getColumnMeta(client, true);
+        return getColumnMeta(client, true, true);
     }
 
     /** convert key to a tuple */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/389ff55e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 7780ca9..79abc2c 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -23,6 +23,8 @@ import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.*;
@@ -31,6 +33,8 @@ import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
@@ -61,7 +65,8 @@ public class CqlStorage extends AbstractCassandraStorage
     private String columns;
     private String outputQuery;
     private String whereClause;
-
+    private boolean hasCompactValueAlias = false;
+        
     public CqlStorage()
     {
         this(1000);
@@ -450,7 +455,7 @@ public class CqlStorage extends AbstractCassandraStorage
         }
 
         // get other columns
-        List<ColumnDef> columns = getColumnMeta(client, false);
+        List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
 
         // combine all columns in a list
         if (keyColumns != null && columns != null)
@@ -458,7 +463,140 @@ public class CqlStorage extends AbstractCassandraStorage
 
         return keyColumns;
     }
-    
+
+    /** get keys meta data */
+    protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+            throws Exception
+    {
+        String query = "SELECT key_aliases, " +
+                "       column_aliases, " +
+                "       key_validator, " +
+                "       comparator, " +
+                "       keyspace_name, " +
+                "       value_alias, " +
+                "       default_validator " +
+                "FROM system.schema_columnfamilies " +
+                "WHERE keyspace_name = '%s'" +
+                "  AND columnfamily_name = '%s' ";
+
+        CqlResult result = client.execute_cql3_query(
+                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+                Compression.NONE,
+                ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        Iterator<CqlRow> iteraRow = result.rows.iterator();
+        List<ColumnDef> keys = new ArrayList<ColumnDef>();
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+            logger.debug("Found ksDef name: {}", name);
+            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+            logger.debug("partition keys: {}", keyString);
+            List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+            Iterator<String> iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+            // classic thrift tables
+            if (keys.size() == 0)
+            {
+                CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+                for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                {
+                    String key = column.toString();
+                    logger.debug("name: {} ", key);
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(key);
+                    keys.add(cDef);
+                }
+                for (ColumnIdentifier column : cfDefinition.columns.keySet())
+                {
+                    String key = column.toString();
+                    logger.debug("name: {} ", key);
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(key);
+                    keys.add(cDef);
+                }
+            }
+
+            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+            logger.debug("cluster keys: {}", keyString);
+            keyNames = FBUtilities.fromJsonList(keyString);
+
+            iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+
+            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+            logger.debug("row key validator: {}", validator);
+            AbstractType<?> keyValidator = parseType(validator);
+
+            Iterator<ColumnDef> keyItera = keys.iterator();
+            if (keyValidator instanceof CompositeType)
+            {
+                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+                while (typeItera.hasNext())
+                    keyItera.next().validation_class = typeItera.next().toString();
+            }
+            else
+                keyItera.next().validation_class = keyValidator.toString();
+
+            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+            logger.debug("cluster key validator: {}", validator);
+
+            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+            {
+                AbstractType<?> clusterKeyValidator = parseType(validator);
+
+                if (clusterKeyValidator instanceof CompositeType)
+                {
+                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+                    while (keyItera.hasNext())
+                        keyItera.next().validation_class = typeItera.next().toString();
+                }
+                else
+                    keyItera.next().validation_class = clusterKeyValidator.toString();
+            }
+
+            // compact value_alias column
+            if (cqlRow.columns.get(5).value != null)
+            {
+                try
+                {
+                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+                    logger.debug("default validator: {}", compactValidator);
+                    AbstractType<?> defaultValidator = parseType(compactValidator);
+
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = cqlRow.columns.get(5).value;
+                    cDef.validation_class = defaultValidator.toString();
+                    keys.add(cDef);
+                    hasCompactValueAlias = true;
+                }
+                catch (Exception e)
+                {
+                    // no compact column at value_alias
+                }
+            }
+
+        }
+        return keys;
+    }
+
     /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
      * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
      * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */