You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/04/13 19:42:16 UTC
svn commit: r1091857 -
/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Author: brandonwilliams
Date: Wed Apr 13 17:42:15 2011
New Revision: 1091857
URL: http://svn.apache.org/viewvc?rev=1091857&view=rev
Log:
Allow pig to use multiple schemas, fix BytesTypes cast during storage.
Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2465
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1091857&r1=1091856&r2=1091857&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Apr 13 17:42:15 2011
@@ -68,7 +68,7 @@ public class CassandraStorage extends Lo
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema";
+ private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -169,7 +169,7 @@ public class CassandraStorage extends Lo
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
- return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
+ return cfdefFromString(property.getProperty(getSchemaContextKey()));
}
private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
@@ -226,7 +226,7 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
- private void setLocationFromUri(String location) throws IOException
+ private void setLocationFromUri(String location) throws IOException
{
// parse uri into keyspace and columnfamily
String names[];
@@ -396,7 +396,7 @@ public class CassandraStorage extends Lo
if (validators.get(column.name) == null)
// Have to special case BytesType to convert DataByteArray into ByteBuffer
if (marshallers.get(1) instanceof BytesType)
- column.value = ByteBuffer.wrap(((DataByteArray) pair.get(1)).get());
+ column.value = objToBB(pair.get(1));
else
column.value = marshallers.get(1).decompose(pair.get(1));
else
@@ -446,9 +446,10 @@ public class CassandraStorage extends Lo
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
-
+
+ String schemaContextKey = getSchemaContextKey();
// Only get the schema if we haven't already gotten it
- if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY))
+ if (!property.containsKey(schemaContextKey))
{
Cassandra.Client client = null;
try
@@ -466,7 +467,7 @@ public class CassandraStorage extends Lo
break;
}
}
- property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef));
+ property.setProperty(schemaContextKey, cfdefToString(cfDef));
}
catch (TException e)
{
@@ -532,4 +533,14 @@ public class CassandraStorage extends Lo
}
return cfDef;
}
+
+ private String getSchemaContextKey()
+ {
+ StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
+ sb.append('.');
+ sb.append(keyspace);
+ sb.append('.');
+ sb.append(column_family);
+ return sb.toString();
+ }
}