You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/21 22:58:45 UTC
svn commit: r1083982 - in /cassandra/branches/cassandra-0.7:
contrib/pig/README.txt
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Author: brandonwilliams
Date: Mon Mar 21 21:58:45 2011
New Revision: 1083982
URL: http://svn.apache.org/viewvc?rev=1083982&view=rev
Log:
Allow specifying a slice predicate for Pig queries
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1618
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/README.txt
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/README.txt?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/README.txt (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/README.txt Mon Mar 21 21:58:45 2011
@@ -47,6 +47,11 @@ grunt> orderednames = ORDER namecounts B
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
+Slices on columns can also be specified:
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1&slice_start=C2&slice_end=C4&i&limit=1&reversed=true' USING CassandraStorage();
+
+Binary values for slice_start and slice_end can be escaped such as '\u0255'
+
Outputting to Cassandra requires the same format from input, so the simplest example is:
grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();
Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Mon Mar 21 21:58:45 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,10 +61,16 @@ public class CassandraStorage extends Lo
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
+ private ByteBuffer slice_start = BOUND;
+ private ByteBuffer slice_end = BOUND;
+ private boolean slice_reverse = false;
+ private String keyspace;
+ private String column_family;
+
private Configuration conf;
private RecordReader reader;
private RecordWriter writer;
- private final int limit;
+ private int limit;
public CassandraStorage()
{
@@ -149,7 +156,7 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
- private String[] parseLocation(String location) throws IOException
+ private void setLocationFromUri(String location) throws IOException
{
// parse uri into keyspace and columnfamily
String names[];
@@ -157,14 +164,30 @@ public class CassandraStorage extends Lo
{
if (!location.startsWith("cassandra://"))
throw new Exception("Bad scheme.");
- String[] parts = location.split("/+");
- names = new String[]{ parts[1], parts[2] };
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ for (String param : urlParts[1].split("&"))
+ {
+ String[] pair = param.split("=");
+ if (pair[0].equals("slice_start"))
+ slice_start = ByteBufferUtil.bytes(pair[1]);
+ else if (pair[0].equals("slice_end"))
+ slice_end = ByteBufferUtil.bytes(pair[1]);
+ else if (pair[0].equals("reversed"))
+ slice_reverse = Boolean.parseBoolean(pair[1]);
+ else if (pair[0].equals("limit"))
+ limit = Integer.parseInt(pair[1]);
+ }
+ }
+ String[] parts = urlParts[0].split("/+");
+ keyspace = parts[1];
+ column_family = parts[2];
}
catch (Exception e)
{
- throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
+ throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
}
- return names;
}
private void setConnectionInformation() throws IOException
@@ -186,12 +209,15 @@ public class CassandraStorage extends Lo
@Override
public void setLocation(String location, Job job) throws IOException
{
- SliceRange range = new SliceRange(BOUND, BOUND, false, limit);
- SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
conf = job.getConfiguration();
- ConfigHelper.setInputSlicePredicate(conf, predicate);
- String[] names = parseLocation(location);
- ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+ setLocationFromUri(location);
+ if (ConfigHelper.getRawInputSlicePredicate(conf) == null)
+ {
+ SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+ ConfigHelper.setInputSlicePredicate(conf, predicate);
+ }
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
}
@@ -214,8 +240,8 @@ public class CassandraStorage extends Lo
public void setStoreLocation(String location, Job job) throws IOException
{
conf = job.getConfiguration();
- String[] names = parseLocation(location);
- ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+ setLocationFromUri(location);
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Mar 21 21:58:45 2011
@@ -159,6 +159,11 @@ public class ConfigHelper
return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
}
+ public static String getRawInputSlicePredicate(Configuration conf)
+ {
+ return conf.get(INPUT_PREDICATE_CONFIG);
+ }
+
private static String predicateToString(SlicePredicate predicate)
{
assert predicate != null;