You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/04/13 19:42:16 UTC

svn commit: r1091857 - /cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Author: brandonwilliams
Date: Wed Apr 13 17:42:15 2011
New Revision: 1091857

URL: http://svn.apache.org/viewvc?rev=1091857&view=rev
Log:
Allow pig to use multiple schemas, fix BytesTypes cast during storage.
Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2465

Modified:
    cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1091857&r1=1091856&r2=1091857&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Apr 13 17:42:15 2011
@@ -68,7 +68,7 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema";
+    private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -169,7 +169,7 @@ public class CassandraStorage extends Lo
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
+        return cfdefFromString(property.getProperty(getSchemaContextKey()));
     }
 
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
@@ -226,7 +226,7 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-     private void setLocationFromUri(String location) throws IOException
+    private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
         String names[];
@@ -396,7 +396,7 @@ public class CassandraStorage extends Lo
                        if (validators.get(column.name) == null)
                            // Have to special case BytesType to convert DataByteArray into ByteBuffer
                            if (marshallers.get(1) instanceof BytesType)
-                               column.value = ByteBuffer.wrap(((DataByteArray) pair.get(1)).get());
+                               column.value = objToBB(pair.get(1));
                            else
                                column.value = marshallers.get(1).decompose(pair.get(1));
                        else
@@ -446,9 +446,10 @@ public class CassandraStorage extends Lo
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        
+
+        String schemaContextKey = getSchemaContextKey();
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY))
+        if (!property.containsKey(schemaContextKey))
         {
             Cassandra.Client client = null;
             try
@@ -466,7 +467,7 @@ public class CassandraStorage extends Lo
                         break;
                     }
                 }
-                property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
+                property.setProperty(schemaContextKey, cfdefToString(cfDef));
             }
             catch (TException e)
             {
@@ -532,4 +533,14 @@ public class CassandraStorage extends Lo
         }
         return cfDef;
     }
+
+    private String getSchemaContextKey()
+    {
+        StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
+        sb.append('.');
+        sb.append(keyspace);
+        sb.append('.');
+        sb.append(column_family);
+        return sb.toString();
+    }
 }