You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/06/26 23:56:36 UTC

[1/2] Pig: support for cq3 tables Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5234

Updated Branches:
  refs/heads/cassandra-1.2 54266ea70 -> 33a3d2ca5


http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
new file mode 100644
index 0000000..004b319
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Cassandra
+ *
+ * A row from a standard CF will be returned as nested tuples: 
+ * (((key1, value1), (key2, value2)), ((name1, val1), (name2, val2))).
+ */
+public class CqlStorage extends AbstractCassandraStorage
+{
+    private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
+
+    private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
+    private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+
+    private int pageSize = 1000;
+    private String columns;
+    private String outputQuery;
+    private String whereClause;
+
+    public CqlStorage()
+    {
+        this(1000);
+    }
+
+    /** @param limit number of CQL rows to fetch in a thrift request */
+    public CqlStorage(int pageSize)
+    {
+        super();
+        this.pageSize = pageSize;
+        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat";
+        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
+    }   
+
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    {
+        this.reader = reader;
+    }
+
+    /** get next row */
+    public Tuple getNext() throws IOException
+    {
+        try
+        {
+            // load the next pair
+            if (!reader.nextKeyValue())
+                return null;
+
+            CfDef cfDef = getCfDef(loadSignature);
+            Map<String, ByteBuffer> keys = reader.getCurrentKey();
+            Map<String, ByteBuffer> columns = reader.getCurrentValue();
+            assert keys != null && columns != null;
+
+            // add key columns to the map
+            for (Map.Entry<String,ByteBuffer> key : keys.entrySet())
+                columns.put(key.getKey(), key.getValue());
+
+            Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+            int i = 0;
+            while (itera.hasNext())
+            {
+                ColumnDef cdef = itera.next();
+                ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
+                if (columnValue != null)
+                {
+                    IColumn column = new Column(cdef.name, columnValue);
+                    tuple.set(i, columnToTuple(column, cfDef, UTF8Type.instance));
+                }
+                else
+                    tuple.set(i, TupleFactory.getInstance().newTuple());
+                i++;
+            }
+            return tuple;
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
+
+    /** set read configuration settings */
+    public void setLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setInputPartitioner(conf, partitionerClass);
+
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+        setConnectionInformation();
+
+        CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+        if (columns != null && !columns.trim().isEmpty())
+            CqlConfigHelper.setInputColumns(conf, columns);        
+        if (whereClause != null && !whereClause.trim().isEmpty())
+            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+
+        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        {
+            try
+            {
+                ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+            }           
+        }
+
+        if (ConfigHelper.getInputRpcPort(conf) == 0)
+            throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+        if (ConfigHelper.getInputInitialAddress(conf) == null)
+            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+        if (loadSignature == null)
+            loadSignature = location;
+
+        initSchema(loadSignature);
+    }
+
+    /** set store configuration settings */
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+        CqlConfigHelper.setOutputCql(conf, outputQuery);
+
+        setConnectionInformation();
+
+        if (ConfigHelper.getOutputRpcPort(conf) == 0)
+            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+        initSchema(storeSignature);
+    }
+    
+    /** schema: ((name, value), (name, value), (name, value)) where keys are in the front. */
+    public ResourceSchema getSchema(String location, Job job) throws IOException
+    {
+        setLocation(location, job);
+        CfDef cfDef = getCfDef(loadSignature);
+
+        // top-level schema, no type
+        ResourceSchema schema = new ResourceSchema();
+
+        // get default marshallers and validators
+        Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+
+        // will contain all fields for this schema
+        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+
+        // defined validators/indexes
+        for (ColumnDef cdef : cfDef.column_metadata)
+        {
+            // make a new tuple for each col/val pair
+            ResourceSchema innerTupleSchema = new ResourceSchema();
+            ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+            innerTupleField.setType(DataType.TUPLE);
+            innerTupleField.setSchema(innerTupleSchema);
+            innerTupleField.setName(new String(cdef.getName()));            
+            ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+            idxColSchema.setName("name");
+            idxColSchema.setType(getPigType(UTF8Type.instance));
+
+            ResourceFieldSchema valSchema = new ResourceFieldSchema();
+            AbstractType validator = validators.get(cdef.name);
+            if (validator == null)
+                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+            valSchema.setName("value");
+            valSchema.setType(getPigType(validator));
+
+            innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+            allSchemaFields.add(innerTupleField);
+        }
+
+        // top level schema contains everything
+        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
+        return schema;
+    }
+
+
+    /** We use CQL3 where clause to define the partition, so do nothing here*/
+    public String[] getPartitionKeys(String location, Job job)
+    {
+        return null;
+    }
+
+    /** We use CQL3 where clause to define the partition, so do nothing here*/
+    public void setPartitionFilter(Expression partitionFilter)
+    {
+    }
+    
+    public void prepareToWrite(RecordWriter writer)
+    {
+        this.writer = writer;
+    }
+
+    /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
+    public void putNext(Tuple t) throws IOException
+    {
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+            return;
+        }
+
+        if (t.getType(0) == DataType.TUPLE)
+        {
+            Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+            if (t.getType(1) == DataType.TUPLE)
+                cqlQueryFromTuple(key, t, 1);
+            else
+                throw new IOException("Second argument in output must be a tuple");
+        }
+        else
+            throw new IOException("First argument in output must be a tuple");
+    }
+
+    /** convert key tuple to key map */
+    private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
+    {
+        Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
+        for (int i = 0; i < t.size(); i++)
+        {
+            if (t.getType(i) == DataType.TUPLE)
+            {
+                Tuple inner = (Tuple) t.get(i);
+                if (inner.size() == 2)
+                {
+                    Object name = inner.get(0);
+                    if (name != null)
+                    {
+                        keys.put(name.toString(), objToBB(inner.get(1)));
+                    }
+                    else
+                        throw new IOException("Key name was empty");
+                }
+                else
+                    throw new IOException("Keys were not in name and value pairs");
+            }
+            else
+            {
+                throw new IOException("keys was not a tuple");
+            }
+        }
+        return keys;
+    }
+
+    /** send CQL query request using data from tuple */
+    private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
+    {
+        for (int i = offset; i < t.size(); i++)
+        {
+            if (t.getType(i) == DataType.TUPLE)
+            {
+                Tuple inner = (Tuple) t.get(i);
+                if (inner.size() > 0)
+                {
+                    
+                    List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+                    if (bindedVariables.size() > 0)
+                        sendCqlQuery(key, bindedVariables);
+                    else
+                        throw new IOException("Missing binded variables");
+                }
+            }
+            else
+            {
+                throw new IOException("Output type was not a tuple");
+            }
+        }
+    }
+
+    /** compose a list of binded variables */
+    private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
+    {
+        List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
+        for (int i = 0; i < t.size(); i++)
+            variables.add(objToBB(t.get(i)));
+        return variables;
+    }
+
+    /** writer write the data by executing CQL query */
+    private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
+    {
+        try
+        {
+            writer.write(key, bindedVariables);
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e);
+        }
+    }
+    
+    /** include key columns */
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+            throws InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException
+    {
+        List<ColumnDef> keyColumns = null;
+        // get key columns
+        try
+        {
+            keyColumns = getKeysMeta(client);
+        }
+        catch(IOException e)
+        {
+            logger.error("Error in retrieving key columns" , e);   
+        }
+
+        // get other columns
+        List<ColumnDef> columns = getColumnMeta(client);
+
+        // combine all columns in a list
+        if (keyColumns != null && columns != null)
+            keyColumns.addAll(columns);
+
+        return keyColumns;
+    }
+    
+    /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
+     * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
+     * [&split_size=<size>][&partitioner=<partitioner>]] */
+    private void setLocationFromUri(String location) throws IOException
+    {
+        try
+        {
+            if (!location.startsWith("cql://"))
+                throw new Exception("Bad scheme: " + location);
+
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+                // each page row size
+                if (urlQuery.containsKey("page_size"))
+                    pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+                // input query select columns
+                if (urlQuery.containsKey("columns"))
+                    columns = urlQuery.get("columns");
+
+                // output prepared statement
+                if (urlQuery.containsKey("output_query"))
+                    outputQuery = urlQuery.get("output_query").replaceAll("#", "?").replaceAll("@", "=");
+
+                // user defined where clause
+                if (urlQuery.containsKey("where_clause"))
+                    whereClause = urlQuery.get("where_clause");
+
+                //split size
+                if (urlQuery.containsKey("split_size"))
+                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
+                if (urlQuery.containsKey("partitioner"))
+                    partitionerClass = urlQuery.get("partitioner");
+            }
+            String[] parts = urlParts[0].split("/+");
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
+            column_family = parts[2];
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
+            		                         "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
+            		                         "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
+        }
+    }
+}
+


[2/2] git commit: Pig: support for cq3 tables Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5234

Posted by br...@apache.org.
Pig: support for cq3 tables
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5234


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33a3d2ca
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33a3d2ca
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33a3d2ca

Branch: refs/heads/cassandra-1.2
Commit: 33a3d2ca57e855ff5484fb039f14d424132db93b
Parents: 54266ea
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 26 16:52:20 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 26 16:52:20 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 examples/pig/README.txt                         |  21 +-
 examples/pig/test/test_storage.pig              |   2 +-
 .../hadoop/ColumnFamilyRecordReader.java        |  39 +-
 .../hadoop/pig/AbstractCassandraStorage.java    | 770 ++++++++++++++++
 .../cassandra/hadoop/pig/CassandraStorage.java  | 892 +++++--------------
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 447 ++++++++++
 7 files changed, 1507 insertions(+), 665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cb3fede..24d4c9e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.2.7
  * Fix serialization of the LEFT gossip value (CASSANDRA-5696)
+ * Pig: support for cql3 tables (CASSANDRA-5234)
 
 1.2.6
  * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index e3d9af6..6dc0937 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -32,7 +32,10 @@ for input and output:
 * PIG_OUTPUT_RPC_PORT : the port thrift is listening on for writing
 * PIG_OUTPUT_PARTITIONER : cluster partitioner for writing
 
-Then you can run it like this:
+CassandraStorage
+================
+
+The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have.  For CQL3 support, refer to the CqlStorage section.
 
 examples/pig$ bin/pig_cassandra -x local example-script.pig
 
@@ -71,8 +74,8 @@ already exist for this to work.
 
 See the example in test/ to see how schema is inferred.
 
-Advanced Options
-================
+Advanced Options for CassandraStorage
+=====================================
 
 The following environment variables default to false but can be set to true to enable them:
 
@@ -92,3 +95,15 @@ PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
                       the amount of mapper tasks created.  This can also be set in the LOAD url by
                       adding the 'split_size=X' parameter, where X is an integer amount for the size.
 
+CqlStorage
+==========
+
+The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies.  The main difference is in the URL format:
+
+cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]
+
+Which in grunt, the simplest example would look like:
+
+grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlStorage();
+
+CqlStorage handles wide rows automatically and thus has no separate flag for this.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/examples/pig/test/test_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_storage.pig b/examples/pig/test/test_storage.pig
index 93dd91f..026cb02 100644
--- a/examples/pig/test/test_storage.pig
+++ b/examples/pig/test/test_storage.pig
@@ -1,4 +1,4 @@
-rows = LOAD 'cassandra://PigTest/SomeApp?widerows=true' USING CassandraStorage();
+rows = LOAD 'cassandra://PigTest/SomeApp' USING CassandraStorage();
 -- full copy
 STORE rows INTO 'cassandra://PigTest/CopyOfSomeApp' USING CassandraStorage();
 -- single tuple

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index daef8ec..701260a 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -218,19 +218,32 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
         {
             try
             {
-                partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
-
-                // Get the Keyspace metadata, then get the specific CF metadata
-                // in order to populate the sub/comparator.
-                KsDef ks_def = client.describe_keyspace(keyspace);
-                List<String> cfnames = new ArrayList<String>();
-                for (CfDef cfd : ks_def.cf_defs)
-                    cfnames.add(cfd.name);
-                int idx = cfnames.indexOf(cfName);
-                CfDef cf_def = ks_def.cf_defs.get(idx);
-
-                comparator = TypeParser.parse(cf_def.comparator_type);
-                subComparator = cf_def.subcomparator_type == null ? null : TypeParser.parse(cf_def.subcomparator_type);
+                partitioner = FBUtilities.newPartitioner(client.describe_partitioner());           
+                // get CF meta data
+                String query = "SELECT comparator," +
+                               "       subcomparator " +
+                               "FROM system.schema_columnfamilies " +
+                               "WHERE keyspace_name = '%s' " +
+                               "  AND columnfamily_name = '%s' ";
+
+                CqlResult result = client.execute_cql3_query(
+                                        ByteBufferUtil.bytes(String.format(query, keyspace, cfName)),
+                                        Compression.NONE,
+                                        ConsistencyLevel.ONE);
+
+                Iterator<CqlRow> iteraRow = result.rows.iterator();
+                CfDef cfDef = new CfDef();
+                if (iteraRow.hasNext())
+                {
+                    CqlRow cqlRow = iteraRow.next();
+                    cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+                    ByteBuffer subComparator = cqlRow.columns.get(1).value;
+                    if (subComparator != null)
+                        cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+                }
+
+                comparator = TypeParser.parse(cfDef.comparator_type);
+                subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type);
             }
             catch (ConfigurationException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
new file mode 100644
index 0000000..ff575b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.*;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A LoadStoreFunc for retrieving data from and storing data to Cassandra
+ */
+public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
+{
+    protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
+
+    // system environment variables that can be set to configure connection info:
+    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+    public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+    public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+    public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+    protected String DEFAULT_INPUT_FORMAT;
+    protected String DEFAULT_OUTPUT_FORMAT;
+
+    protected static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
+
+    protected String username;
+    protected String password;
+    protected String keyspace;
+    protected String column_family;
+    protected String loadSignature;
+    protected String storeSignature;
+
+    protected Configuration conf;
+    protected String inputFormatClass;
+    protected String outputFormatClass;
+    protected int splitSize = 64 * 1024;
+    protected String partitionerClass;
+
+    public AbstractCassandraStorage()
+    {
+        super();
+    }
+
+    /** Deconstructs a composite type to a Tuple. */
+    protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
+    {
+        List<CompositeComponent> result = comparator.deconstruct(name);
+        Tuple t = TupleFactory.getInstance().newTuple(result.size());
+        for (int i=0; i<result.size(); i++)
+            setTupleValue(t, i, result.get(i).comparator.compose(result.get(i).value));
+
+        return t;
+    }
+
+    /** convert a column to a tuple */
+    protected Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+        // name
+        if(comparator instanceof AbstractCompositeType)
+            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
+        else
+            setTupleValue(pair, 0, comparator.compose(col.name()));
+
+        // value
+        if (col instanceof Column)
+        {
+            // standard
+            Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+            if (validators.get(col.name()) == null)
+            {
+                Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+                setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()));
+            }
+            else
+                setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
+            return pair;
+        }
+        else
+        {
+            // super
+            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
+            for (IColumn subcol : col.getSubColumns())
+                subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
+
+            pair.set(1, new DefaultDataBag(subcols));
+        }
+        return pair;
+    }
+
+    /** set the value to the position of the tuple */
+    protected void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+    {
+       if (value instanceof BigInteger)
+           pair.set(position, ((BigInteger) value).intValue());
+       else if (value instanceof ByteBuffer)
+           pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+       else if (value instanceof UUID)
+           pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+       else if (value instanceof Date)
+           pair.set(position, DateType.instance.decompose((Date) value).getLong());
+       else
+           pair.set(position, value);
+    }
+
+    /** get the columnfamily definition for the signature */
+    protected CfDef getCfDef(String signature)
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        return cfdefFromString(property.getProperty(signature));
+    }
+
+    /** construct a map to store the mashaller type to cassandra data type mapping */
+    protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+    {
+        Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+        AbstractType comparator;
+        AbstractType subcomparator;
+        AbstractType default_validator;
+        AbstractType key_validator;
+
+        comparator = parseType(cfDef.getComparator_type());
+        subcomparator = parseType(cfDef.getSubcomparator_type());
+        default_validator = parseType(cfDef.getDefault_validation_class());
+        key_validator = parseType(cfDef.getKey_validation_class());
+
+        marshallers.put(MarshallerType.COMPARATOR, comparator);
+        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+        return marshallers;
+    }
+
+    /** get the validators */
+    protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+        for (ColumnDef cd : cfDef.getColumn_metadata())
+        {
+            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+            {
+                AbstractType validator = null;
+                try
+                {
+                    validator = TypeParser.parse(cd.getValidation_class());
+                    validators.put(cd.name, validator);
+                }
+                catch (ConfigurationException e)
+                {
+                    throw new IOException(e);
+                }
+                catch (SyntaxException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+
+    /** parse the string to a cassandra data type */
+    protected AbstractType parseType(String type) throws IOException
+    {
+        try
+        {
+            // always treat counters like longs, specifically CCT.compose is not what we need
+            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+                    return LongType.instance;
+            return TypeParser.parse(type);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+        catch (SyntaxException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public InputFormat getInputFormat()
+    {
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** decompose the query to store the parameters in a map*/
+    public static Map<String, String> getQueryMap(String query)
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>();
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], keyValue[1]);
+        }
+        return map;
+    }
+
+    /** set hadoop cassandra connection settings */
+    protected void setConnectionInformation() throws IOException
+    {
+        if (System.getenv(PIG_RPC_PORT) != null)
+        {
+            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        }
+
+        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+        {
+            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+        }
+        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+        if (System.getenv(PIG_PARTITIONER) != null)
+        {
+            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+        }
+        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+        if (System.getenv(PIG_INPUT_FORMAT) != null)
+            inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+    }
+
+    /** get the full class name */
+    protected String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+    }
+
+    /** get pig type for the cassandra data type*/
+    protected byte getPigType(AbstractType type)
+    {
+        if (type instanceof LongType || type instanceof DateType) // DateType is bad and it should feel bad
+            return DataType.LONG;
+        else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
+            return DataType.INTEGER;
+        else if (type instanceof AsciiType)
+            return DataType.CHARARRAY;
+        else if (type instanceof UTF8Type)
+            return DataType.CHARARRAY;
+        else if (type instanceof FloatType)
+            return DataType.FLOAT;
+        else if (type instanceof DoubleType)
+            return DataType.DOUBLE;
+        else if (type instanceof AbstractCompositeType )
+            return DataType.TUPLE;
+
+        return DataType.BYTEARRAY;
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+    {
+        return location;
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
+    /** StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        this.storeSignature = signature;
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    /** output format */
+    public OutputFormat getOutputFormat()
+    {
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    /** convert object to ByteBuffer */
+    protected ByteBuffer objToBB(Object o)
+    {
+        if (o == null)
+            return (ByteBuffer)o;
+        if (o instanceof java.lang.String)
+            return ByteBuffer.wrap(new DataByteArray((String)o).get());
+        if (o instanceof Integer)
+            return Int32Type.instance.decompose((Integer)o);
+        if (o instanceof Long)
+            return LongType.instance.decompose((Long)o);
+        if (o instanceof Float)
+            return FloatType.instance.decompose((Float)o);
+        if (o instanceof Double)
+            return DoubleType.instance.decompose((Double)o);
+        if (o instanceof UUID)
+            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+            int totalLength = 0;
+            for(Object sub : objects)
+            {
+                ByteBuffer buffer = objToBB(sub);
+                serialized.add(buffer);
+                totalLength += 2 + buffer.remaining() + 1;
+            }
+            ByteBuffer out = ByteBuffer.allocate(totalLength);
+            for (ByteBuffer bb : serialized)
+            {
+                int length = bb.remaining();
+                out.put((byte) ((length >> 8) & 0xFF));
+                out.put((byte) (length & 0xFF));
+                out.put(bb);
+                out.put((byte) 0);
+            }
+            out.flip();
+            return out;
+        }
+
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    /** Methods to get the column family schema from Cassandra */
+    protected void initSchema(String signature)
+    {
+        Properties properties = UDFContext.getUDFContext().getUDFProperties(AbstractCassandraStorage.class);
+
+        // Only get the schema if we haven't already gotten it
+        if (!properties.containsKey(signature))
+        {
+            try
+            {
+                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+                client.set_keyspace(keyspace);
+
+                if (username != null && password != null)
+                {
+                    Map<String, String> credentials = new HashMap<String, String>(2);
+                    credentials.put(IAuthenticator.USERNAME_KEY, username);
+                    credentials.put(IAuthenticator.PASSWORD_KEY, password);
+
+                    try
+                    {
+                        client.login(new AuthenticationRequest(credentials));
+                    }
+                    catch (AuthenticationException e)
+                    {
+                        logger.error("Authentication exception: invalid username and/or password");
+                        throw new RuntimeException(e);
+                    }
+                    catch (AuthorizationException e)
+                    {
+                        throw new AssertionError(e); // never actually throws AuthorizationException.
+                    }
+                }
+
+                // compose the CfDef for the columfamily
+                CfDef cfDef = getCfDef(client);
+
+                if (cfDef != null)
+                    properties.setProperty(signature, cfdefToString(cfDef));
+                else
+                    throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'",
+                                                             column_family,
+                                                             keyspace));
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InvalidRequestException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (UnavailableException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (TimedOutException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (SchemaDisagreementException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /** convert CfDef to string */
+    protected static String cfdefToString(CfDef cfDef)
+    {
+        assert cfDef != null;
+        // this is so awful it's kind of cool!
+        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            return Hex.bytesToHex(serializer.serialize(cfDef));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** convert string back to CfDef */
+    protected static CfDef cfdefFromString(String st)
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+        CfDef cfDef = new CfDef();
+        try
+        {
+            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return cfDef;
+    }
+
+    /** return the CfDef for the column family */
+    protected CfDef getCfDef(Cassandra.Client client)
+            throws InvalidRequestException,
+                   UnavailableException,
+                   TimedOutException,
+                   SchemaDisagreementException,
+                   TException,
+                   CharacterCodingException
+    {
+        // get CF meta data
+        String query = "SELECT type, " +
+                       "       comparator," +
+                       "       subcomparator," +
+                       "       default_validator, " +
+                       "       key_validator," +
+                       "       key_aliases " +
+                       "FROM system.schema_columnfamilies " +
+                       "WHERE keyspace_name = '%s' " +
+                       "  AND columnfamily_name = '%s' ";
+
+        CqlResult result = client.execute_cql3_query(
+                                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+                                Compression.NONE,
+                                ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        Iterator<CqlRow> iteraRow = result.rows.iterator();
+        CfDef cfDef = new CfDef();
+        cfDef.keyspace = keyspace;
+        cfDef.name = column_family;
+        boolean cql3Table = false;
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+
+            cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+            cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
+            ByteBuffer subComparator = cqlRow.columns.get(2).value;
+            if (subComparator != null)
+                cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+            cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
+            cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+            List<String> keys = null;
+            if (cqlRow.columns.get(5).value != null)
+            {
+                String keyAliases = ByteBufferUtil.string(cqlRow.columns.get(5).value);
+                keys = FBUtilities.fromJsonList(keyAliases);
+            }
+            // get column meta data
+            if (keys != null && keys.size() > 0)
+                cql3Table = true;
+        }
+        cfDef.column_metadata = getColumnMetadata(client, cql3Table);
+        return cfDef;
+    }
+
+    /** get a list of columns */
+    protected abstract List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table)
+            throws InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException;
+
+    /** get column meta data */
+    protected List<ColumnDef> getColumnMeta(Cassandra.Client client)
+            throws InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException
+    {
+        String query = "SELECT column_name, " +
+                       "       validator, " +
+                       "       index_type " +
+                       "FROM system.schema_columns " +
+                       "WHERE keyspace_name = '%s' " +
+                       "  AND columnfamily_name = '%s'";
+
+        CqlResult result = client.execute_cql3_query(
+                                   ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+                                   Compression.NONE,
+                                   ConsistencyLevel.ONE);
+
+        List<CqlRow> rows = result.rows;
+        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+        if (rows == null || rows.isEmpty())
+            return columnDefs;
+
+        Iterator<CqlRow> iterator = rows.iterator();
+        while (iterator.hasNext())
+        {
+            CqlRow row = iterator.next();
+            ColumnDef cDef = new ColumnDef();
+            cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+            cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
+            ByteBuffer indexType = row.getColumns().get(2).value;
+            if (indexType != null)
+                cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
+            columnDefs.add(cDef);
+        }
+        return columnDefs;
+    }
+
+    /** get keys meta data  */
+    protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
+            throws InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            IOException
+    {
+        String query = "SELECT key_aliases, " +
+                       "       column_aliases, " +
+                       "       key_validator, " +
+                       "       comparator, " +
+                       "       keyspace_name, " +
+                       "       value_alias, " +
+                       "       default_validator  " +
+                       "FROM system.schema_columnfamilies " +
+                       "WHERE keyspace_name = '%s'" +
+                       "  AND columnfamily_name = '%s' ";
+
+        CqlResult result = client.execute_cql3_query(
+                                            ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+                                            Compression.NONE,
+                                            ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        List<CqlRow> rows = result.rows;
+        Iterator<CqlRow> iteraRow = rows.iterator();
+        List<ColumnDef> keys = new ArrayList<ColumnDef>();
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+            logger.debug("Found ksDef name: {}", name);
+            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+            logger.debug("partition keys: " + keyString);
+            List<String> keyNames = FBUtilities.fromJsonList(keyString);
+ 
+            Iterator<String> iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+
+            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+            logger.debug("cluster keys: " + keyString);
+            keyNames = FBUtilities.fromJsonList(keyString);
+
+            iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+
+            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+            logger.debug("row key validator: " + validator);
+            AbstractType<?> keyValidator = parseType(validator);
+
+            Iterator<ColumnDef> keyItera = keys.iterator();
+            if (keyValidator instanceof CompositeType)
+            {
+                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+                while (typeItera.hasNext())
+                    keyItera.next().validation_class = typeItera.next().toString();
+            }
+            else
+                keyItera.next().validation_class = keyValidator.toString();
+
+            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+            logger.debug("cluster key validator: " + validator);
+
+            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+            {
+                AbstractType<?> clusterKeyValidator = parseType(validator);
+
+                if (clusterKeyValidator instanceof CompositeType)
+                {
+                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+                    while (keyItera.hasNext())
+                        keyItera.next().validation_class = typeItera.next().toString();
+                }
+                else
+                    keyItera.next().validation_class = clusterKeyValidator.toString();
+            }
+
+            // compact value_alias column
+            if (cqlRow.columns.get(5).value != null)
+            {
+                try
+                {
+                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+                    logger.debug("default validator: " + compactValidator);
+                    AbstractType<?> defaultValidator = parseType(compactValidator);
+
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = cqlRow.columns.get(5).value;
+                    cDef.validation_class = defaultValidator.toString();
+                    keys.add(cDef);
+                }
+                catch (Exception e)
+                {
+                    // no compact column at value_alias
+                }
+            }
+
+        }
+        return keys;
+    }
+
+    /** get index type from string */
+    protected IndexType getIndexType(String type)
+    {
+        type = type.toLowerCase();
+        if ("keys".equals(type))
+            return IndexType.KEYS;
+        else if("custom".equals(type))
+            return IndexType.CUSTOM;
+        else if("composites".equals(type))
+            return IndexType.COMPOSITES;
+        else
+            return null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a3d2ca/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 6490d05..ed445a2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,34 +18,25 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
 
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.*;
-import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -57,30 +48,11 @@ import org.slf4j.LoggerFactory;
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
  */
-public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
+public class CassandraStorage extends AbstractCassandraStorage
 {
-    private enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
-    // system environment variables that can be set to configure connection info:
-    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
-    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
-    public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
-    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
-    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
-    public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
-    public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
-    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
-    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
-    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
-    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
-    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
     public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
-    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
-
-    private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
-    private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
 
     private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
 
@@ -91,22 +63,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     private ByteBuffer slice_end = BOUND;
     private boolean slice_reverse = false;
     private boolean allow_deletes = false;
-    private String username;
-    private String password;
-    private String keyspace;
-    private String column_family;
-    private String loadSignature;
-    private String storeSignature;
-
-    private Configuration conf;
+
     private RecordReader<ByteBuffer, Map<ByteBuffer, IColumn>> reader;
     private RecordWriter<ByteBuffer, List<Mutation>> writer;
-    private String inputFormatClass;
-    private String outputFormatClass;
-    private int limit;
+
     private boolean widerows = false;
     private boolean usePartitionFilter = false;
-    private int splitSize = 64 * 1024;
+    private int limit;
+    
     // wide row hacks
     private ByteBuffer lastKey;
     private Map<ByteBuffer,IColumn> lastRow;
@@ -117,13 +81,13 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         this(1024);
     }
 
-    /**
-     * @param limit number of columns to fetch in a slice
-     */
+    /**@param limit number of columns to fetch in a slice */
     public CassandraStorage(int limit)
     {
         super();
         this.limit = limit;
+        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
+        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
     }
 
     public int getLimit()
@@ -131,6 +95,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return limit;
     }
 
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    {
+        this.reader = reader;
+    }
+
+    /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
         CfDef cfDef = getCfDef(loadSignature);
@@ -229,6 +199,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
     }
 
     @Override
+    /** read next row */
     public Tuple getNext() throws IOException
     {
         if (widerows)
@@ -289,315 +260,18 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
     }
 
-    /**
-     *  Deconstructs a composite type to a Tuple.
-     */
-    private Tuple composeComposite( AbstractCompositeType comparator, ByteBuffer name ) throws IOException
-    {
-        List<CompositeComponent> result = comparator.deconstruct( name );
-
-        Tuple t = TupleFactory.getInstance().newTuple( result.size() );
-
-        for( int i = 0; i < result.size(); i++ )
-        {
-            setTupleValue( t, i, result.get(i).comparator.compose( result.get(i).value ) );
-        }
-
-        return t;
-    }
-
-    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        Tuple tuple = TupleFactory.getInstance().newTuple(1);
-        addKeyToTuple(tuple, key, cfDef, comparator);
-        return tuple;
-    }
-
-    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        if( comparator instanceof AbstractCompositeType )
-        {
-            setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
-        }
-        else
-        {
-            setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
-        }
-
-    }
-
-    private Tuple columnToTuple(IColumn col, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        Tuple pair = TupleFactory.getInstance().newTuple(2);
-
-        if( comparator instanceof AbstractCompositeType )
-        {
-            setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,col.name()));
-        }
-        else
-        {
-            setTupleValue(pair, 0, comparator.compose(col.name()));
-        }
-        if (col instanceof Column)
-        {
-            // standard
-            Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-
-            if (validators.get(col.name()) == null)
-            {
-                Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-                setTupleValue(pair, 1, marshallers.get(MarshallerType.DEFAULT_VALIDATOR).compose(col.value()));
-            }
-            else
-                setTupleValue(pair, 1, validators.get(col.name()).compose(col.value()));
-            return pair;
-        }
-        else
-        {
-            // super
-            ArrayList<Tuple> subcols = new ArrayList<Tuple>();
-            for (IColumn subcol : col.getSubColumns())
-                subcols.add(columnToTuple(subcol, cfDef, parseType(cfDef.getSubcomparator_type())));
-
-            pair.set(1, new DefaultDataBag(subcols));
-        }
-        return pair;
-    }
-
-    private void setTupleValue(Tuple pair, int position, Object value) throws ExecException
-    {
-       if (value instanceof BigInteger)
-           pair.set(position, ((BigInteger) value).intValue());
-       else if (value instanceof ByteBuffer)
-           pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
-       else if (value instanceof UUID)
-           pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
-       else if (value instanceof Date)
-           pair.set(position, DateType.instance.decompose((Date) value).getLong());
-       else
-           pair.set(position, value);
-    }
-
-    private CfDef getCfDef(String signature)
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(signature));
-    }
-
-    private List<IndexExpression> getIndexExpressions()
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
-        if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
-            return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
-        else
-            return null;
-    }
-
-    private Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
-    {
-        Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
-        AbstractType comparator;
-        AbstractType subcomparator;
-        AbstractType default_validator;
-        AbstractType key_validator;
-
-        comparator = parseType(cfDef.getComparator_type());
-        subcomparator = parseType(cfDef.getSubcomparator_type());
-        default_validator = parseType(cfDef.getDefault_validation_class());
-        key_validator = parseType(cfDef.getKey_validation_class());
-
-        marshallers.put(MarshallerType.COMPARATOR, comparator);
-        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
-        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
-        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
-        return marshallers;
-    }
-
-    private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
-    {
-        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
-        for (ColumnDef cd : cfDef.getColumn_metadata())
-        {
-            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
-            {
-                AbstractType validator = null;
-                try
-                {
-                    validator = TypeParser.parse(cd.getValidation_class());
-                    validators.put(cd.name, validator);
-                }
-                catch (ConfigurationException e)
-                {
-                    throw new IOException(e);
-                }
-                catch (SyntaxException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-        }
-        return validators;
-    }
-
-    private AbstractType parseType(String type) throws IOException
-    {
-        try
-        {
-            // always treat counters like longs, specifically CCT.compose is not what we need
-            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
-                    return LongType.instance;
-            return TypeParser.parse(type);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-        catch (SyntaxException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    public InputFormat getInputFormat()
-    {
-        try
-        {
-            return FBUtilities.construct(inputFormatClass, "inputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    public static Map<String, String> getQueryMap(String query)
-    {
-        String[] params = query.split("&");
-        Map<String, String> map = new HashMap<String, String>();
-        for (String param : params)
-        {
-            String[] keyValue = param.split("=");
-            map.put(keyValue[0], keyValue[1]);
-        }
-        return map;
-    }
-
-    private void setLocationFromUri(String location) throws IOException
+    /** set hadoop cassandra connection settings */
+    protected void setConnectionInformation() throws IOException
     {
-        try
-        {
-            if (!location.startsWith("cassandra://"))
-                throw new Exception("Bad scheme.");
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
-                AbstractType comparator = BytesType.instance;
-                if (urlQuery.containsKey("comparator"))
-                    comparator = TypeParser.parse(urlQuery.get("comparator"));
-                if (urlQuery.containsKey("slice_start"))
-                    slice_start = comparator.fromString(urlQuery.get("slice_start"));
-                if (urlQuery.containsKey("slice_end"))
-                    slice_end = comparator.fromString(urlQuery.get("slice_end"));
-                if (urlQuery.containsKey("reversed"))
-                    slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
-                if (urlQuery.containsKey("limit"))
-                    limit = Integer.parseInt(urlQuery.get("limit"));
-                if (urlQuery.containsKey("allow_deletes"))
-                    allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
-                if (urlQuery.containsKey("widerows"))
-                    widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
-                if (urlQuery.containsKey("use_secondary"))
-                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
-                if (urlQuery.containsKey("split_size"))
-                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
-            }
-            String[] parts = urlParts[0].split("/+");
-            String[] credentialsAndKeyspace = parts[1].split("@");
-            if (credentialsAndKeyspace.length > 1)
-            {
-                String[] credentials = credentialsAndKeyspace[0].split(":");
-                username = credentials[0];
-                password = credentials[1];
-                keyspace = credentialsAndKeyspace[1];
-            }
-            else
-            {
-                keyspace = parts[1];
-            }
-            column_family = parts[2];
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage());
-        }
-    }
-
-    private void setConnectionInformation() throws IOException
-    {
-        if (System.getenv(PIG_RPC_PORT) != null)
-        {
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
-        }
-
-        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
-            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
-        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
-            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
-
-        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
-        {
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
-        }
-        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
-        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
-            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
-
-        if (System.getenv(PIG_PARTITIONER) != null)
-        {
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
-        }
-        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
-            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
-        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
-            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
-        if (System.getenv(PIG_INPUT_FORMAT) != null)
-            inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT));
-        else
-            inputFormatClass = DEFAULT_INPUT_FORMAT;
-        if (System.getenv(PIG_OUTPUT_FORMAT) != null)
-            outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT));
-        else
-            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+        super.setConnectionInformation();
         if (System.getenv(PIG_ALLOW_DELETES) != null)
             allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
     }
 
-    private String getFullyQualifiedClassName(String classname)
-    {
-        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
-    }
-
-    @Override
+    /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
         conf = job.getConfiguration();
-        
-        // don't combine mappers to a single mapper per node
-        conf.setBoolean("pig.noSplitCombination", true);
         setLocationFromUri(location);
 
         if (ConfigHelper.getInputSlicePredicate(conf) == null)
@@ -616,20 +290,22 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             {
                 ConfigHelper.setInputSplitSize(conf, Integer.valueOf(System.getenv(PIG_INPUT_SPLIT_SIZE)));
             }
-            catch(NumberFormatException e)
+            catch (NumberFormatException e)
             {
                 throw new RuntimeException("PIG_INPUT_SPLIT_SIZE is not a number", e);
             }           
-        }
+        } 
 
         if (usePartitionFilter && getIndexExpressions() != null)
             ConfigHelper.setInputRange(conf, getIndexExpressions());
 
         if (username != null && password != null)
             ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
-        
+
         if (splitSize > 0)
             ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setInputPartitioner(conf, partitionerClass);
 
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
         setConnectionInformation();
@@ -645,6 +321,40 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         initSchema(loadSignature);
     }
 
+    /** set store configuration settings */
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        
+        // don't combine mappers to a single mapper per node
+        conf.setBoolean("pig.noSplitCombination", true);
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+        setConnectionInformation();
+
+        if (ConfigHelper.getOutputRpcPort(conf) == 0)
+            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+        // we have to do this again here for the check in writeColumnsFromTuple
+        if (System.getenv(PIG_USE_SECONDARY) != null)
+            usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
+
+        initSchema(storeSignature);
+    }
+
+    /** define the schema */
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
@@ -695,30 +405,34 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        // defined validators/indexes
-        for (ColumnDef cdef : cfDef.column_metadata)
+        if (!widerows)
         {
-            // make a new tuple for each col/val pair
-            ResourceSchema innerTupleSchema = new ResourceSchema();
-            ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
-            innerTupleField.setType(DataType.TUPLE);
-            innerTupleField.setSchema(innerTupleSchema);
-            innerTupleField.setName(new String(cdef.getName()));
-
-            ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
-            idxColSchema.setName("name");
-            idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-
-            ResourceFieldSchema valSchema = new ResourceFieldSchema();
-            AbstractType validator = validators.get(cdef.name);
-            if (validator == null)
-                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-            valSchema.setName("value");
-            valSchema.setType(getPigType(validator));
-
-            innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
-            allSchemaFields.add(innerTupleField);
+            // defined validators/indexes
+            for (ColumnDef cdef : cfDef.column_metadata)
+            {
+                // make a new tuple for each col/val pair
+                ResourceSchema innerTupleSchema = new ResourceSchema();
+                ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
+                innerTupleField.setType(DataType.TUPLE);
+                innerTupleField.setSchema(innerTupleSchema);
+                innerTupleField.setName(new String(cdef.getName()));
+
+                ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
+                idxColSchema.setName("name");
+                idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+
+                ResourceFieldSchema valSchema = new ResourceFieldSchema();
+                AbstractType validator = validators.get(cdef.name);
+                if (validator == null)
+                    validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+                valSchema.setName("value");
+                valSchema.setType(getPigType(validator));
+
+                innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
+                allSchemaFields.add(innerTupleField);
+            }   
         }
+
         // bag at the end for unknown columns
         allSchemaFields.add(bagField);
 
@@ -741,31 +455,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return schema;
     }
 
-    private byte getPigType(AbstractType type)
-    {
-        if (type instanceof LongType || type instanceof DateType) // DateType is bad and it should feel bad
-            return DataType.LONG;
-        else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
-            return DataType.INTEGER;
-        else if (type instanceof AsciiType)
-            return DataType.CHARARRAY;
-        else if (type instanceof UTF8Type)
-            return DataType.CHARARRAY;
-        else if (type instanceof FloatType)
-            return DataType.FLOAT;
-        else if (type instanceof DoubleType)
-            return DataType.DOUBLE;
-        else if (type instanceof AbstractCompositeType )
-            return DataType.TUPLE;
-
-        return DataType.BYTEARRAY;
-    }
-
-    public ResourceStatistics getStatistics(String location, Job job)
-    {
-        return null;
-    }
-
+    /** return partition keys */
     public String[] getPartitionKeys(String location, Job job)
     {
         if (!usePartitionFilter)
@@ -779,170 +469,21 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return partitionKeys;
     }
 
+    /** set partition filter */
     public void setPartitionFilter(Expression partitionFilter)
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
         property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
-    private List<IndexExpression> filterToIndexExpressions(Expression expression)
-    {
-        List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
-        Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
-        ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
-        ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
-        switch (expression.getOpType())
-        {
-            case OP_EQ:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
-                break;
-            case OP_GE:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
-                break;
-            case OP_GT:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
-                break;
-            case OP_LE:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
-                break;
-            case OP_LT:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
-                break;
-            case OP_AND:
-                indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
-                indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
-                break;
-            default:
-                throw new RuntimeException("Unsupported expression type: " + expression.getOpType().name());
-        }
-        return indexExpressions;
-    }
-
-    private List<ColumnDef> getIndexes()
-    {
-        CfDef cfdef = getCfDef(loadSignature);
-        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
-        for (ColumnDef cdef : cfdef.column_metadata)
-        {
-            if (cdef.index_type != null)
-                indexes.add(cdef);
-        }
-        return indexes;
-    }
-
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
-    {
-        return location;
-    }
-
-    @Override
-    public void setUDFContextSignature(String signature)
-    {
-        this.loadSignature = signature;
-    }
-
-    /* StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        this.storeSignature = signature;
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = job.getConfiguration();
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
-
-        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
-        setConnectionInformation();
-
-        if (ConfigHelper.getOutputRpcPort(conf) == 0)
-            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
-        // we have to do this again here for the check in writeColumnsFromTuple
-        if (System.getenv(PIG_USE_SECONDARY) != null)
-            usePartitionFilter = Boolean.valueOf(System.getenv(PIG_USE_SECONDARY));
-
-        initSchema(storeSignature);
-    }
-
-    public OutputFormat getOutputFormat()
-    {
-        try
-        {
-            return FBUtilities.construct(outputFormatClass, "outputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
+    /** prepare writer */
     public void prepareToWrite(RecordWriter writer)
     {
         this.writer = writer;
     }
 
-    private ByteBuffer objToBB(Object o)
-    {
-        if (o == null)
-            return (ByteBuffer)o;
-        if (o instanceof java.lang.String)
-            return ByteBuffer.wrap(new DataByteArray((String)o).get());
-        if (o instanceof Integer)
-            return Int32Type.instance.decompose((Integer)o);
-        if (o instanceof Long)
-            return LongType.instance.decompose((Long)o);
-        if (o instanceof Float)
-            return FloatType.instance.decompose((Float)o);
-        if (o instanceof Double)
-            return DoubleType.instance.decompose((Double)o);
-        if (o instanceof UUID)
-            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
-        if(o instanceof Tuple) {
-            List<Object> objects = ((Tuple)o).getAll();
-            List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-            int totalLength = 0;
-            for(Object sub : objects)
-            {
-                ByteBuffer buffer = objToBB(sub);
-                serialized.add(buffer);
-                totalLength += 2 + buffer.remaining() + 1;
-            }
-            ByteBuffer out = ByteBuffer.allocate(totalLength);
-            for (ByteBuffer bb : serialized)
-            {
-                int length = bb.remaining();
-                out.put((byte) ((length >> 8) & 0xFF));
-                out.put((byte) (length & 0xFF));
-                out.put(bb);
-                out.put((byte) 0);
-            }
-            out.flip();
-            return out;
-        }
-
-        return ByteBuffer.wrap(((DataByteArray) o).get());
-    }
-
+    /** write next row */
     public void putNext(Tuple t) throws IOException
     {
         /*
@@ -971,6 +512,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             throw new IOException("Second argument in output must be a tuple or bag");
     }
 
+    /** write tuple data to cassandra */
     private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
     {
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
@@ -993,6 +535,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             writeMutations(key, mutationList);
     }
 
+    /** compose Cassandra mutation from tuple */
     private Mutation mutationFromTuple(Tuple t) throws IOException
     {
         Mutation mutation = new Mutation();
@@ -1021,6 +564,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         return mutation;
     }
 
+    /** write bag data to Cassandra */
     private void writeColumnsFromBag(ByteBuffer key, DefaultDataBag bag) throws IOException
     {
         List<Mutation> mutationList = new ArrayList<Mutation>();
@@ -1074,6 +618,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
             writeMutations(key, mutationList);
     }
 
+    /** write mutation to Cassandra */
     private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
     {
         try
@@ -1086,90 +631,65 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
     }
 
-    public void cleanupOnFailure(String failure, Job job)
+    /** get a list of Cassandra IndexExpression from Pig expression */
+    private List<IndexExpression> filterToIndexExpressions(Expression expression)
     {
+        List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
+        Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
+        ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
+        ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
+        switch (expression.getOpType())
+        {
+            case OP_EQ:
+                indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
+                break;
+            case OP_GE:
+                indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
+                break;
+            case OP_GT:
+                indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
+                break;
+            case OP_LE:
+                indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
+                break;
+            case OP_LT:
+                indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
+                break;
+            case OP_AND:
+                indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
+                indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
+                break;
+            default:
+                throw new RuntimeException("Unsupported expression type: " + expression.getOpType().name());
+        }
+        return indexExpressions;
     }
 
-    /* Methods to get the column family schema from Cassandra */
-
-    private void initSchema(String signature)
+    /** get a list of columns with defined index*/
+    private List<ColumnDef> getIndexes()
     {
-        Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
-
-        // Only get the schema if we haven't already gotten it
-        if (!properties.containsKey(signature))
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
         {
-            try
-            {
-                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-                client.set_keyspace(keyspace);
-
-                if (username != null && password != null)
-                {
-                    Map<String, String> credentials = new HashMap<String, String>(2);
-                    credentials.put(IAuthenticator.USERNAME_KEY, username);
-                    credentials.put(IAuthenticator.PASSWORD_KEY, password);
-
-                    try
-                    {
-                        client.login(new AuthenticationRequest(credentials));
-                    }
-                    catch (AuthenticationException e)
-                    {
-                        logger.error("Authentication exception: invalid username and/or password");
-                        throw new RuntimeException(e);
-                    }
-                    catch (AuthorizationException e)
-                    {
-                        throw new AssertionError(e); // never actually throws AuthorizationException.
-                    }
-                }
-
-                CfDef cfDef = null;
-                KsDef ksDef = client.describe_keyspace(keyspace);
-                List<CfDef> defs = ksDef.getCf_defs();
-                for (CfDef def : defs)
-                {
-                    if (column_family.equalsIgnoreCase(def.getName()))
-                    {
-                        cfDef = def;
-                        break;
-                    }
-                }
-                if (cfDef != null)
-                    properties.setProperty(signature, cfdefToString(cfDef));
-                else
-                    throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'",
-                                                             column_family,
-                                                             keyspace));
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (NotFoundException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            if (cdef.index_type != null)
+                indexes.add(cdef);
         }
+        return indexes;
     }
 
-    private static String cfdefToString(CfDef cfDef)
+    /** convert a list of index expression to string */
+    private static String indexExpressionsToString(List<IndexExpression> indexExpressions)
     {
-        assert cfDef != null;
-        // this is so awful it's kind of cool!
+        assert indexExpressions != null;
+        // oh, you thought cfdefToString was awful?
+        IndexClause indexClause = new IndexClause();
+        indexClause.setExpressions(indexExpressions);
+        indexClause.setStart_key("".getBytes());
         TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
         try
         {
-            return Hex.bytesToHex(serializer.serialize(cfDef));
+            return Hex.bytesToHex(serializer.serialize(indexClause));
         }
         catch (TException e)
         {
@@ -1177,54 +697,130 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo
         }
     }
 
-    private static CfDef cfdefFromString(String st)
+    /** convert string to a list of index expression */
+    private static List<IndexExpression> indexExpressionsFromString(String ie)
     {
-        assert st != null;
+        assert ie != null;
         TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        CfDef cfDef = new CfDef();
+        IndexClause indexClause = new IndexClause();
         try
         {
-            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+            deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
         }
         catch (TException e)
         {
             throw new RuntimeException(e);
         }
-        return cfDef;
+        return indexClause.getExpressions();
     }
 
-    private static String indexExpressionsToString(List<IndexExpression> indexExpressions)
+    /** get a list of index expression */
+    private List<IndexExpression> getIndexExpressions()
     {
-        assert indexExpressions != null;
-        // oh, you thought cfdefToString was awful?
-        IndexClause indexClause = new IndexClause();
-        indexClause.setExpressions(indexExpressions);
-        indexClause.setStart_key("".getBytes());
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
+            return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+        else
+            return null;
+    }
+
+    /** get a list of column for the column family */
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client, boolean cql3Table) 
+            throws InvalidRequestException, 
+            UnavailableException, 
+            TimedOutException, 
+            SchemaDisagreementException, 
+            TException,
+            CharacterCodingException
+    {
+        if (cql3Table)
+            return new ArrayList<ColumnDef>();
+        
+        return getColumnMeta(client);
+    }
+
+    /** convert key to a tuple */
+    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        Tuple tuple = TupleFactory.getInstance().newTuple(1);
+        addKeyToTuple(tuple, key, cfDef, comparator);
+        return tuple;
+    }
+
+    /** add key to a tuple */
+    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        if( comparator instanceof AbstractCompositeType )
         {
-            return Hex.bytesToHex(serializer.serialize(indexClause));
+            setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
         }
-        catch (TException e)
+        else
         {
-            throw new RuntimeException(e);
+            setTupleValue(tuple, 0, getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR).compose(key));
         }
+
     }
 
-    private static List<IndexExpression> indexExpressionsFromString(String ie)
+    /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
+     * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
+     * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
+    private void setLocationFromUri(String location) throws IOException
     {
-        assert ie != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        IndexClause indexClause = new IndexClause();
         try
         {
-            deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
+            if (!location.startsWith("cassandra://"))
+                throw new Exception("Bad scheme." + location);
+            
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+                AbstractType comparator = BytesType.instance;
+                if (urlQuery.containsKey("comparator"))
+                    comparator = TypeParser.parse(urlQuery.get("comparator"));
+                if (urlQuery.containsKey("slice_start"))
+                    slice_start = comparator.fromString(urlQuery.get("slice_start"));
+                if (urlQuery.containsKey("slice_end"))
+                    slice_end = comparator.fromString(urlQuery.get("slice_end"));
+                if (urlQuery.containsKey("reversed"))
+                    slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
+                if (urlQuery.containsKey("limit"))
+                    limit = Integer.parseInt(urlQuery.get("limit"));
+                if (urlQuery.containsKey("allow_deletes"))
+                    allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
+                if (urlQuery.containsKey("widerows"))
+                    widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
+                if (urlQuery.containsKey("use_secondary"))
+                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+                if (urlQuery.containsKey("split_size"))
+                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
+                if (urlQuery.containsKey("partitioner"))
+                    partitionerClass = urlQuery.get("partitioner");
+            }
+            String[] parts = urlParts[0].split("/+");
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
+            column_family = parts[2];
         }
-        catch (TException e)
+        catch (Exception e)
         {
-            throw new RuntimeException(e);
+            throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>" +
+            		                        "[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]" +
+            		                        "[&allow_deletes=true][&widerows=true][&use_secondary=true]" +
+            		                        "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage());
         }
-        return indexClause.getExpressions();
     }
+
 }