You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/21 22:58:45 UTC

svn commit: r1083982 - in /cassandra/branches/cassandra-0.7: contrib/pig/README.txt contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Author: brandonwilliams
Date: Mon Mar 21 21:58:45 2011
New Revision: 1083982

URL: http://svn.apache.org/viewvc?rev=1083982&view=rev
Log:
Allow specifying a slice predicate for Pig queries
Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1618

Modified:
    cassandra/branches/cassandra-0.7/contrib/pig/README.txt
    cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/README.txt?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/README.txt (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/README.txt Mon Mar 21 21:58:45 2011
@@ -47,6 +47,11 @@ grunt> orderednames = ORDER namecounts B
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
 
+Slices on columns can also be specified:
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1&slice_start=C2&slice_end=C4&i&limit=1&reversed=true' USING CassandraStorage();
+
+Binary values for slice_start and slice_end can be escaped such as '\u0255'
+
 Outputting to Cassandra requires the same format from input, so the simplest example is:
 
 grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();

Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Mon Mar 21 21:58:45 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -60,10 +61,16 @@ public class CassandraStorage extends Lo
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
+    private ByteBuffer slice_start = BOUND;
+    private ByteBuffer slice_end = BOUND;
+    private boolean slice_reverse = false;
+    private String keyspace;
+    private String column_family;
+
     private Configuration conf;
     private RecordReader reader;
     private RecordWriter writer;
-    private final int limit;
+    private int limit;
 
     public CassandraStorage() 
     { 
@@ -149,7 +156,7 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-    private String[] parseLocation(String location) throws IOException
+    private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
         String names[];
@@ -157,14 +164,30 @@ public class CassandraStorage extends Lo
         {
             if (!location.startsWith("cassandra://"))
                 throw new Exception("Bad scheme.");
-            String[] parts = location.split("/+");
-            names = new String[]{ parts[1], parts[2] };
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                for (String param : urlParts[1].split("&"))
+                {
+                    String[] pair = param.split("=");
+                    if (pair[0].equals("slice_start"))
+                        slice_start = ByteBufferUtil.bytes(pair[1]);
+                    else if (pair[0].equals("slice_end"))
+                        slice_end = ByteBufferUtil.bytes(pair[1]);
+                    else if (pair[0].equals("reversed"))
+                        slice_reverse = Boolean.parseBoolean(pair[1]);
+                    else if (pair[0].equals("limit"))
+                        limit = Integer.parseInt(pair[1]);
+                }
+            }
+            String[] parts = urlParts[0].split("/+");
+            keyspace = parts[1];
+            column_family = parts[2];
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
         }
-       return names;
     }
 
     private void setConnectionInformation() throws IOException
@@ -186,12 +209,15 @@ public class CassandraStorage extends Lo
     @Override
     public void setLocation(String location, Job job) throws IOException
     {
-        SliceRange range = new SliceRange(BOUND, BOUND, false, limit);
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
         conf = job.getConfiguration();
-        ConfigHelper.setInputSlicePredicate(conf, predicate);
-        String[] names = parseLocation(location);
-        ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+        setLocationFromUri(location);
+        if (ConfigHelper.getRawInputSlicePredicate(conf) == null)
+        {
+            SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
+            SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+            ConfigHelper.setInputSlicePredicate(conf, predicate);
+        }
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
     }
 
@@ -214,8 +240,8 @@ public class CassandraStorage extends Lo
     public void setStoreLocation(String location, Job job) throws IOException
     {
         conf = job.getConfiguration();
-        String[] names = parseLocation(location);
-        ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+        setLocationFromUri(location);
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
     }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1083982&r1=1083981&r2=1083982&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Mar 21 21:58:45 2011
@@ -159,6 +159,11 @@ public class ConfigHelper
         return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
     }
 
+    public static String getRawInputSlicePredicate(Configuration conf)
+    {
+        return conf.get(INPUT_PREDICATE_CONFIG);
+    }
+
     private static String predicateToString(SlicePredicate predicate)
     {
         assert predicate != null;