You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/10/14 23:57:30 UTC
svn commit: r1183518 - in /cassandra/branches/cassandra-0.8: CHANGES.txt
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Author: brandonwilliams
Date: Fri Oct 14 21:57:29 2011
New Revision: 1183518
URL: http://svn.apache.org/viewvc?rev=1183518&view=rev
Log:
Add 0.8+ types and key validation type to pig schema.
Patch by Steeve Morin, reviewed by brandonwilliams for CASSANDRA-3280
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183518&r1=1183517&r2=1183518&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:57:29 2011
@@ -15,6 +15,8 @@
* (Hadoop) accept comma delimited lists of initial thrift connections
(CASSANDRA-3185)
* ColumnFamily min_compaction_threshold should be >= 2 (CASSANDRA-3342)
+ * (Pig) add 0.8+ types and key validation type in schema (CASSANDRA-3280)
+
0.8.7
* Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183518&r1=1183517&r2=1183518&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Oct 14 21:57:29 2011
@@ -107,7 +107,7 @@ public class CassandraStorage extends Lo
return limit;
}
- @Override
+ @Override
public Tuple getNext() throws IOException
{
try
@@ -122,7 +122,7 @@ public class CassandraStorage extends Lo
assert key != null && cf != null;
// and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
ArrayList<Tuple> columns = new ArrayList<Tuple>();
tuple.set(0, new DataByteArray(key.array(), key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
@@ -187,10 +187,12 @@ public class CassandraStorage extends Lo
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
AbstractType comparator = null;
AbstractType default_validator = null;
+ AbstractType key_validator = null;
try
{
- comparator = TypeParser.parse(cfDef.comparator_type);
- default_validator = TypeParser.parse(cfDef.default_validation_class);
+ comparator = TypeParser.parse(cfDef.getComparator_type());
+ default_validator = TypeParser.parse(cfDef.getDefault_validation_class());
+ key_validator = TypeParser.parse(cfDef.getKey_validation_class());
}
catch (ConfigurationException e)
{
@@ -199,13 +201,14 @@ public class CassandraStorage extends Lo
marshallers.add(comparator);
marshallers.add(default_validator);
+ marshallers.add(key_validator);
return marshallers;
}
- private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+ private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
{
Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
- for (ColumnDef cd : cfDef.column_metadata)
+ for (ColumnDef cd : cfDef.getColumn_metadata())
{
if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
{
@@ -236,6 +239,18 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
+ public static Map<String, String> getQueryMap(String query)
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>();
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], keyValue[1]);
+ }
+ return map;
+ }
+
private void setLocationFromUri(String location) throws IOException
{
// parse uri into keyspace and columnfamily
@@ -247,18 +262,18 @@ public class CassandraStorage extends Lo
String[] urlParts = location.split("\\?");
if (urlParts.length > 1)
{
- for (String param : urlParts[1].split("&"))
- {
- String[] pair = param.split("=");
- if (pair[0].equals("slice_start"))
- slice_start = ByteBufferUtil.bytes(pair[1]);
- else if (pair[0].equals("slice_end"))
- slice_end = ByteBufferUtil.bytes(pair[1]);
- else if (pair[0].equals("reversed"))
- slice_reverse = Boolean.parseBoolean(pair[1]);
- else if (pair[0].equals("limit"))
- limit = Integer.parseInt(pair[1]);
- }
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+ AbstractType comparator = BytesType.instance;
+ if (urlQuery.containsKey("comparator"))
+ comparator = TypeParser.parse(urlQuery.get("comparator"));
+ if (urlQuery.containsKey("slice_start"))
+ slice_start = comparator.fromString(urlQuery.get("slice_start"));
+ if (urlQuery.containsKey("slice_end"))
+ slice_end = comparator.fromString(urlQuery.get("slice_end"));
+ if (urlQuery.containsKey("reversed"))
+ slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
+ if (urlQuery.containsKey("limit"))
+ limit = Integer.parseInt(urlQuery.get("limit"));
}
String[] parts = urlParts[0].split("/+");
keyspace = parts[1];
@@ -312,10 +327,14 @@ public class CassandraStorage extends Lo
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
+ // get default marshallers and validators
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
// add key
ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
keyFieldSchema.setName("key");
- keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+ keyFieldSchema.setType(getPigType(marshallers.get(2)));
// will become the bag of tuples
ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
@@ -323,9 +342,6 @@ public class CassandraStorage extends Lo
bagFieldSchema.setType(DataType.BAG);
ResourceSchema bagSchema = new ResourceSchema();
-
- List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
List<ResourceFieldSchema> tupleFields = new ArrayList<ResourceFieldSchema>();
// default comparator/validator
@@ -381,6 +397,10 @@ public class CassandraStorage extends Lo
return DataType.CHARARRAY;
else if (type instanceof UTF8Type)
return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
return DataType.BYTEARRAY;
}