You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/09/11 17:19:32 UTC

[5/9] git commit: Support thrift tables in Pig CqlStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5847

Support thrift tables in Pig CqlStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5847


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5618e36
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5618e36
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5618e36

Branch: refs/heads/trunk
Commit: f5618e36dcec78c0fb791327defad14b4488b235
Parents: 8bedb57
Author: Brandon Williams <br...@apache.org>
Authored: Wed Sep 11 10:16:19 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Sep 11 10:16:19 2013 -0500

----------------------------------------------------------------------
 .../hadoop/pig/AbstractCassandraStorage.java    | 182 ++++++++++++++-----
 .../cassandra/hadoop/pig/CassandraStorage.java  |   8 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  10 +-
 3 files changed, 147 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 03805d2..68e18c8 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -29,6 +29,9 @@ import java.util.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql3.CFDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
@@ -205,6 +208,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                 try
                 {
                     validator = TypeParser.parse(cd.getValidation_class());
+                    if (validator instanceof CounterColumnType)
+                        validator = LongType.instance; 
                     validators.put(cd.name, validator);
                 }
                 catch (ConfigurationException e)
@@ -515,27 +520,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                                                              column_family,
                                                              keyspace));
             }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (UnavailableException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (TimedOutException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (SchemaDisagreementException e)
+            catch (Exception e)
             {
                 throw new RuntimeException(e);
             }
@@ -582,15 +567,19 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                    TimedOutException,
                    SchemaDisagreementException,
                    TException,
-                   CharacterCodingException
+                   CharacterCodingException,
+                   NotFoundException,
+                   org.apache.cassandra.exceptions.InvalidRequestException,
+                   ConfigurationException
     {
         // get CF meta data
-        String query = "SELECT type, " +
+        String query = "SELECT type," +
                        "       comparator," +
                        "       subcomparator," +
-                       "       default_validator, " +
+                       "       default_validator," +
                        "       key_validator," +
-                       "       key_aliases " +
+                       "       key_aliases," +
+                       "       key_alias " +
                        "FROM system.schema_columnfamilies " +
                        "WHERE keyspace_name = '%s' " +
                        "  AND columnfamily_name = '%s' ";
@@ -624,10 +613,27 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             {
                 String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
                 keys = FBUtilities.fromJsonList(keyAliases);
+                // classis thrift tables
+                if (keys.size() == 0 && cqlRow.columns.get(6).value == null)
+                {
+                    CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+                    for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                    {
+                        String key = column.toString();
+                        String type = cfDefinition.keys.get(column).type.toString();
+                        logger.debug("name: {}, type: {} ", key, type);
+                        keys.add(key);
+                    }
+                }
+                else
+                    cql3Table = true;
+            }
+            else
+            {
+                String keyAlias = ByteBufferUtil.string(cqlRow.columns.get(6).value);
+                keys = new ArrayList<String>(1);
+                keys.add(keyAlias);
             }
-            // get column meta data
-            if (keys != null && keys.size() > 0)
-                cql3Table = true;
         }
         cfDef.column_metadata = getColumnMetadata(client, cql3Table);
         return cfDef;
@@ -640,16 +646,22 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             TimedOutException,
             SchemaDisagreementException,
             TException,
-            CharacterCodingException;
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException;
 
     /** get column meta data */
-    protected List<ColumnDef> getColumnMeta(Cassandra.Client client)
+    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage)
             throws InvalidRequestException,
             UnavailableException,
             TimedOutException,
             SchemaDisagreementException,
             TException,
-            CharacterCodingException
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
     {
         String query = "SELECT column_name, " +
                        "       validator, " +
@@ -665,7 +677,34 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
         List<CqlRow> rows = result.rows;
         List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
-        if (rows == null || rows.isEmpty())
+        if (!cassandraStorage && (rows == null || rows.isEmpty()))
+        {
+            // check classic thrift tables
+            CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+            for (ColumnIdentifier column : cfDefinition.metadata.keySet())
+            {
+                ColumnDef cDef = new ColumnDef();
+                String columnName = column.toString();
+                String type = cfDefinition.metadata.get(column).type.toString();
+                logger.debug("name: {}, type: {} ", columnName, type);
+                cDef.name = ByteBufferUtil.bytes(columnName);
+                cDef.validation_class = type;
+                columnDefs.add(cDef);
+            }
+            if (columnDefs.size() == 0)
+            {
+                String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
+                if ("value".equals(value))
+                {
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(value);
+                    cDef.validation_class = cfDefinition.value.type.toString();
+                    columnDefs.add(cDef);
+                }
+            }
+            return columnDefs;
+        }
+        else if (rows == null || rows.isEmpty())
             return columnDefs;
 
         Iterator<CqlRow> iterator = rows.iterator();
@@ -683,14 +722,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         return columnDefs;
     }
 
-    /** get keys meta data  */
+    /** get keys meta data */
     protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            IOException
+            throws Exception
     {
         String query = "SELECT key_aliases, " +
                        "       column_aliases, " +
@@ -698,7 +732,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                        "       comparator, " +
                        "       keyspace_name, " +
                        "       value_alias, " +
-                       "       default_validator  " +
+                       "       default_validator," +
+                       "       key_alias  " +
                        "FROM system.schema_columnfamilies " +
                        "WHERE keyspace_name = '%s'" +
                        "  AND columnfamily_name = '%s' ";
@@ -719,19 +754,52 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             CqlRow cqlRow = iteraRow.next();
             String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
             logger.debug("Found ksDef name: {}", name);
-            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
-            logger.debug("partition keys: {}", keyString);
-            List<String> keyNames = FBUtilities.fromJsonList(keyString);
- 
-            Iterator<String> iterator = keyNames.iterator();
-            while (iterator.hasNext())
+            String keyString;
+            List<String> keyNames;
+            Iterator<String> iterator;
+            if (cqlRow.columns.get(0).getValue() == null)
             {
                 ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                cDef.name = ByteBuffer.wrap(result.rows.get(0).columns.get(7).getValue());
                 keys.add(cDef);
             }
+            else
+            {
+                keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+                logger.debug("partition keys: {}", keyString);
+                keyNames = FBUtilities.fromJsonList(keyString);
+     
+                iterator = keyNames.iterator();
+                while (iterator.hasNext())
+                {
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(iterator.next());
+                    keys.add(cDef);
+                }
+                // classic thrift tables
+                if (keys.size() == 0)
+                {
+                    CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
+                    for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                    {
+                        String key = column.toString();
+                        logger.debug("name: {} ", key);
+                        ColumnDef cDef = new ColumnDef();
+                        cDef.name = ByteBufferUtil.bytes(key);
+                        keys.add(cDef);
+                    }
+                    for (ColumnIdentifier column : cfDefinition.columns.keySet())
+                    {
+                        String key = column.toString();
+                        logger.debug("name: {} ", key);
+                        ColumnDef cDef = new ColumnDef();
+                        cDef.name = ByteBufferUtil.bytes(key);
+                        keys.add(cDef);
+                    }
+                }
 
+            }
             keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
 
             logger.debug("cluster keys: {}", keyString);
@@ -840,5 +908,23 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         }
         return indexes;
     }
+
+
+    /** get CFDefinition of a column family */
+    private CFDefinition getCfDefinition(String ks, String cf, Cassandra.Client client)
+            throws NotFoundException,
+            InvalidRequestException,
+            TException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException
+    {
+        KsDef ksDef = client.describe_keyspace(ks);
+        for (CfDef cfDef : ksDef.cf_defs)
+        {
+            if (cfDef.name.equalsIgnoreCase(cf))
+                return new CFDefinition(CFMetaData.fromThrift(cfDef));
+        }
+        return null;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index add4395..dbdd5e9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -25,6 +25,7 @@ import java.util.*;
 
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -702,12 +703,15 @@ public class CassandraStorage extends AbstractCassandraStorage
             TimedOutException, 
             SchemaDisagreementException, 
             TException,
-            CharacterCodingException
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
     {
         if (cql3Table)
             return new ArrayList<ColumnDef>();
         
-        return getColumnMeta(client);
+        return getColumnMeta(client, true);
     }
 
     /** convert key to a tuple */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5618e36/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index a73e5a5..b35e13a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -26,6 +26,7 @@ import java.util.*;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.thrift.*;
@@ -432,7 +433,10 @@ public class CqlStorage extends AbstractCassandraStorage
             TimedOutException,
             SchemaDisagreementException,
             TException,
-            CharacterCodingException
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
     {
         List<ColumnDef> keyColumns = null;
         // get key columns
@@ -440,13 +444,13 @@ public class CqlStorage extends AbstractCassandraStorage
         {
             keyColumns = getKeysMeta(client);
         }
-        catch(IOException e)
+        catch(Exception e)
         {
             logger.error("Error in retrieving key columns" , e);   
         }
 
         // get other columns
-        List<ColumnDef> columns = getColumnMeta(client);
+        List<ColumnDef> columns = getColumnMeta(client, false);
 
         // combine all columns in a list
         if (keyColumns != null && columns != null)