You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/01/13 18:28:17 UTC

[2/3] cassandra git commit: Pig: Refactor and deprecate CqlStorage

Pig: Refactor and deprecate CqlStorage

Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-8599


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/359d3bb2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/359d3bb2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/359d3bb2

Branch: refs/heads/trunk
Commit: 359d3bb2a69b856e6dca8a060f6087307808cb5e
Parents: 1cb426b
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 11:24:45 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 11:24:45 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 examples/pig/README.txt                         |  27 +-
 examples/pig/example-script-cql.pig             |   6 +-
 examples/pig/test/test_cql_storage.pig          |  12 +-
 .../hadoop/pig/AbstractCassandraStorage.java    |   2 +-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 520 +++++++++++++-
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 693 +------------------
 7 files changed, 542 insertions(+), 719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2e25c4..e070eaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
  * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
  * Fix case-sensitivity of index name on CREATE and DROP INDEX
    statements (CASSANDRA-8365)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 2ae9824..1553a9f 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -35,7 +35,7 @@ for input and output:
 CassandraStorage
 ================
 
-The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have.  For CQL3 support, refer to the CqlStorage section.
+The CassandraStorage class is for any non-CQL3 ColumnFamilies you may have.  For CQL3 support, refer to the CqlNativeStorage section.
 
 examples/pig$ bin/pig_cassandra -x local example-script.pig
 
@@ -95,15 +95,24 @@ PIG_INPUT_SPLIT_SIZE: this sets the split size passed to Hadoop, controlling
                       the amount of mapper tasks created.  This can also be set in the LOAD url by
                       adding the 'split_size=X' parameter, where X is an integer amount for the size.
 
-CqlStorage
-==========
-
-The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies.  The main difference is in the URL format:
-
-cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]]
+CqlNativeStorage
+================
 
+The CqlNativeStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies.  The main difference is in the URL format:
+
+cql://[username:password@]<keyspace>/<columnfamily>
+                    [?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]
+                    [&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]
+                    [&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]
+                    [&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]
+                    [&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]
+                    [&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]
+                    [&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]
+                    [&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]
+                    [&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]
+                    [columns=<columns>][where_clause=<where_clause>]]
 Which in grunt, the simplest example would look like:
 
-grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlStorage();
+grunt> rows = LOAD 'cql://MyKeyspace/MyColumnFamily' USING CqlNativeStorage();
 
-CqlStorage handles wide rows automatically and thus has no separate flag for this.
+CqlNativeStorage handles wide rows automatically and thus has no separate flag for this.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/example-script-cql.pig
----------------------------------------------------------------------
diff --git a/examples/pig/example-script-cql.pig b/examples/pig/example-script-cql.pig
index 63656a7..ef11130 100644
--- a/examples/pig/example-script-cql.pig
+++ b/examples/pig/example-script-cql.pig
@@ -1,5 +1,5 @@
--- CqlStorage
-libdata = LOAD 'cql://libdata/libout' USING CqlStorage();
+-- CqlNativeStorage
+libdata = LOAD 'cql://libdata/libout' USING CqlNativeStorage();
 book_by_mail = FILTER libdata BY C_OUT_TY == 'BM';
 
 libdata_buildings = FILTER libdata BY SQ_FEET > 0;
@@ -8,4 +8,4 @@ state_grouped = GROUP state_flat BY State;
 state_footage = FOREACH state_grouped GENERATE group AS State, SUM(state_flat.SquareFeet) AS TotalFeet:int;
 
 insert_format= FOREACH state_footage GENERATE TOTUPLE(TOTUPLE('year',2011),TOTUPLE('state',State)),TOTUPLE(TotalFeet);
-STORE insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F' USING CqlStorage;
\ No newline at end of file
+STORE insert_format INTO 'cql://libdata/libsqft?output_query=UPDATE%20libdata.libsqft%20SET%20sqft%20%3D%20%3F' USING CqlNativeStorage;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/examples/pig/test/test_cql_storage.pig
----------------------------------------------------------------------
diff --git a/examples/pig/test/test_cql_storage.pig b/examples/pig/test/test_cql_storage.pig
index 3383d4a..822748e 100644
--- a/examples/pig/test/test_cql_storage.pig
+++ b/examples/pig/test/test_cql_storage.pig
@@ -1,14 +1,14 @@
-moretestvalues= LOAD 'cql://cql3ks/moredata/' USING CqlStorage;
+moretestvalues= LOAD 'cql://cql3ks/moredata/' USING CqlNativeStorage;
 insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);
-STORE insertformat INTO 'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage;
+STORE insertformat INTO 'cql://cql3ks/test?output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlNativeStorage;
 
 -- composite key
-moredata = load 'cql://cql3ks/compmore' USING CqlStorage;
+moredata = load 'cql://cql3ks/compmore' USING CqlNativeStorage;
 insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);
-STORE insertformat INTO 'cql://cql3ks/compotable?output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage;
+STORE insertformat INTO 'cql://cql3ks/compotable?output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlNativeStorage;
 
 -- collection column
-collectiontable = LOAD 'cql://cql3ks/collectiontable/' USING CqlStorage;
+collectiontable = LOAD 'cql://cql3ks/collectiontable/' USING CqlNativeStorage;
 -- recs= (((m,kk)),((map,(m,mm),(n,nn))))
 recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));
-store recs INTO 'cql://cql3ks/collectiontable?output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();
+store recs INTO 'cql://cql3ks/collectiontable?output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlNativeStorage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 035f99a..5884f29 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -683,7 +683,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             if (cassandraStorage)
                 return columnDefs;
 
-            // otherwise for CqlStorage, check metadata for classic thrift tables
+            // otherwise for CqlNativeStorage, check metadata for classic thrift tables
             CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
             for (ColumnDefinition def : cfm.regularAndStaticColumns())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index f0bb8f9..3eb6823 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -19,30 +19,49 @@ package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.Map;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.BufferCell;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.ColumnDef;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.Expression;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Row;
 
-public class CqlNativeStorage extends CqlStorage
+public class CqlNativeStorage extends AbstractCassandraStorage
 {
+    private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
+    private int pageSize = 1000;
+    private String columns;
+    private String outputQuery;
+    private String whereClause;
+    private boolean hasCompactValueAlias = false;
+
     private RecordReader<Long, Row> reader;
+    private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
     private String nativePort;
     private String nativeCoreConnections;
     private String nativeMaxConnections;
@@ -72,8 +91,10 @@ public class CqlNativeStorage extends CqlStorage
     /** @param pageSize limit number of CQL rows to fetch in a thrift request */
     public CqlNativeStorage(int pageSize)
     {
-        super(pageSize);
+        super();
+        this.pageSize = pageSize;
         DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
     }
 
     public void prepareToRead(RecordReader reader, PigSplit split)
@@ -84,6 +105,11 @@ public class CqlNativeStorage extends CqlStorage
         }
     }
 
+    public void prepareToWrite(RecordWriter writer)
+    {
+        this.writer = writer;
+    }
+
     /** get next row */
     public Tuple getNext() throws IOException
     {
@@ -121,6 +147,421 @@ public class CqlNativeStorage extends CqlStorage
         }
     }
 
+    /** convert a cql column to an object */
+    private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+    {
+        // standard
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        ByteBuffer cellName = col.name().toByteBuffer();
+        if (validators.get(cellName) == null)
+            return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
+        else
+            return cassandraToObj(validators.get(cellName), col.value());
+    }
+
+    /** set the value to the position of the tuple */
+    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof CollectionType)
+            setCollectionTupleValues(tuple, position, value, validator);
+        else
+           setTupleValue(tuple, position, value);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        if (validator instanceof MapType)
+        {
+            setMapTupleValues(tuple, position, value, validator);
+            return;
+        }
+        AbstractType elementValidator;
+        if (validator instanceof SetType)
+            elementValidator = ((SetType<?>) validator).getElementsType();
+        else if (validator instanceof ListType)
+            elementValidator = ((ListType<?>) validator).getElementsType();
+        else
+            return;
+
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
+        for (Object entry : (Collection<?>) value)
+        {
+            setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    /** set the values of set/list at and after the position of the tuple */
+    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    {
+        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).getKeysType();
+        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).getValuesType();
+
+        int i = 0;
+        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
+        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
+        {
+            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
+            setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
+            setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
+            innerTuple.set(i, mapEntryTuple);
+            i++;
+        }
+        tuple.set(position, innerTuple);
+    }
+
+    private Object cassandraToPigData(Object obj, AbstractType validator)
+    {
+        if (validator instanceof DecimalType || validator instanceof InetAddressType)
+            return validator.getString(validator.decompose(obj));
+        return obj;
+    }
+
+    /** include key columns */
+    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
+            throws InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
+    {
+        List<ColumnDef> keyColumns = null;
+        // get key columns
+        try
+        {
+            keyColumns = getKeysMeta(client);
+        }
+        catch(Exception e)
+        {
+            logger.error("Error in retrieving key columns" , e);
+        }
+
+        // get other columns
+        List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
+
+        // combine all columns in a list
+        if (keyColumns != null && columns != null)
+            keyColumns.addAll(columns);
+
+        return keyColumns;
+    }
+
+    /** get keys meta data */
+    private List<ColumnDef> getKeysMeta(Cassandra.Client client)
+            throws Exception
+    {
+        String query = "SELECT key_aliases, " +
+                "       column_aliases, " +
+                "       key_validator, " +
+                "       comparator, " +
+                "       keyspace_name, " +
+                "       value_alias, " +
+                "       default_validator " +
+                "FROM system.schema_columnfamilies " +
+                "WHERE keyspace_name = '%s'" +
+                "  AND columnfamily_name = '%s' ";
+
+        CqlResult result = client.execute_cql3_query(
+                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
+                Compression.NONE,
+                ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        Iterator<CqlRow> iteraRow = result.rows.iterator();
+        List<ColumnDef> keys = new ArrayList<ColumnDef>();
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+            logger.debug("Found ksDef name: {}", name);
+            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
+
+            logger.debug("partition keys: {}", keyString);
+            List<String> keyNames = FBUtilities.fromJsonList(keyString);
+
+            Iterator<String> iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+            // classic thrift tables
+            if (keys.size() == 0)
+            {
+                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+                for (ColumnDefinition def : cfm.partitionKeyColumns())
+                {
+                    String key = def.name.toString();
+                    logger.debug("name: {} ", key);
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(key);
+                    keys.add(cDef);
+                }
+                for (ColumnDefinition def : cfm.clusteringColumns())
+                {
+                    String key = def.name.toString();
+                    logger.debug("name: {} ", key);
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = ByteBufferUtil.bytes(key);
+                    keys.add(cDef);
+                }
+            }
+
+            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
+
+            logger.debug("cluster keys: {}", keyString);
+            keyNames = FBUtilities.fromJsonList(keyString);
+
+            iterator = keyNames.iterator();
+            while (iterator.hasNext())
+            {
+                ColumnDef cDef = new ColumnDef();
+                cDef.name = ByteBufferUtil.bytes(iterator.next());
+                keys.add(cDef);
+            }
+
+            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
+            logger.debug("row key validator: {}", validator);
+            AbstractType<?> keyValidator = parseType(validator);
+
+            Iterator<ColumnDef> keyItera = keys.iterator();
+            if (keyValidator instanceof CompositeType)
+            {
+                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
+                while (typeItera.hasNext())
+                    keyItera.next().validation_class = typeItera.next().toString();
+            }
+            else
+                keyItera.next().validation_class = keyValidator.toString();
+
+            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
+            logger.debug("cluster key validator: {}", validator);
+
+            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
+            {
+                AbstractType<?> clusterKeyValidator = parseType(validator);
+
+                if (clusterKeyValidator instanceof CompositeType)
+                {
+                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
+                    while (keyItera.hasNext())
+                        keyItera.next().validation_class = typeItera.next().toString();
+                }
+                else
+                    keyItera.next().validation_class = clusterKeyValidator.toString();
+            }
+
+            // compact value_alias column
+            if (cqlRow.columns.get(5).value != null)
+            {
+                try
+                {
+                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
+                    logger.debug("default validator: {}", compactValidator);
+                    AbstractType<?> defaultValidator = parseType(compactValidator);
+
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = cqlRow.columns.get(5).value;
+                    cDef.validation_class = defaultValidator.toString();
+                    keys.add(cDef);
+                    hasCompactValueAlias = true;
+                }
+                catch (Exception e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    // no compact column at value_alias
+                }
+            }
+
+        }
+        return keys;
+    }
+
+
+    /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
+    public void putNext(Tuple t) throws IOException
+    {
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+            return;
+        }
+
+        if (t.getType(0) == DataType.TUPLE)
+        {
+            if (t.getType(1) == DataType.TUPLE)
+            {
+                Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+                cqlQueryFromTuple(key, t, 1);
+            }
+            else
+                throw new IOException("Second argument in output must be a tuple");
+        }
+        else
+            throw new IOException("First argument in output must be a tuple");
+    }
+
+    /** convert key tuple to key map */
+    private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
+    {
+        Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
+        for (int i = 0; i < t.size(); i++)
+        {
+            if (t.getType(i) == DataType.TUPLE)
+            {
+                Tuple inner = (Tuple) t.get(i);
+                if (inner.size() == 2)
+                {
+                    Object name = inner.get(0);
+                    if (name != null)
+                    {
+                        keys.put(name.toString(), objToBB(inner.get(1)));
+                    }
+                    else
+                        throw new IOException("Key name was empty");
+                }
+                else
+                    throw new IOException("Keys were not in name and value pairs");
+            }
+            else
+            {
+                throw new IOException("keys was not a tuple");
+            }
+        }
+        return keys;
+    }
+
+    /** send CQL query request using data from tuple */
+    private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
+    {
+        for (int i = offset; i < t.size(); i++)
+        {
+            if (t.getType(i) == DataType.TUPLE)
+            {
+                Tuple inner = (Tuple) t.get(i);
+                if (inner.size() > 0)
+                {
+                    List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+                    if (bindedVariables.size() > 0)
+                        sendCqlQuery(key, bindedVariables);
+                    else
+                        throw new IOException("Missing binded variables");
+                }
+            }
+            else
+            {
+                throw new IOException("Output type was not a tuple");
+            }
+        }
+    }
+
+    /** compose a list of binded variables */
+    private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
+    {
+        List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
+        for (int i = 0; i < t.size(); i++)
+            variables.add(objToBB(t.get(i)));
+        return variables;
+    }
+
+    /** writer write the data by executing CQL query */
+    private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
+    {
+        try
+        {
+            writer.write(key, bindedVariables);
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /** schema: (value, value, value) where keys are in the front. */
+    public ResourceSchema getSchema(String location, Job job) throws IOException
+    {
+        setLocation(location, job);
+        CfInfo cfInfo = getCfInfo(loadSignature);
+        CfDef cfDef = cfInfo.cfDef;
+        // top-level schema, no type
+        ResourceSchema schema = new ResourceSchema();
+
+        // get default marshallers and validators
+        Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+
+        // will contain all fields for this schema
+        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
+
+        for (ColumnDef cdef : cfDef.column_metadata)
+        {
+            ResourceFieldSchema valSchema = new ResourceFieldSchema();
+            AbstractType validator = validators.get(cdef.name);
+            if (validator == null)
+                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+            valSchema.setName(new String(cdef.getName()));
+            valSchema.setType(getPigType(validator));
+            allSchemaFields.add(valSchema);
+        }
+
+        // top level schema contains everything
+        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
+        return schema;
+    }
+
+    public void setPartitionFilter(Expression partitionFilter) throws IOException
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
+    }
+
+    /**
+     * Return cql where clauses for the corresponding partition filter. Make sure the data format matches
+     * Only support the following Pig data types: int, long, float, double, boolean and chararray
+     * */
+    private String partitionFilterToWhereClauseString(Expression expression) throws IOException
+    {
+        Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
+        OpType op = expression.getOpType();
+        String opString = op.toString();
+        switch (op)
+        {
+            case OP_EQ:
+                opString = " = ";
+            case OP_GE:
+            case OP_GT:
+            case OP_LE:
+            case OP_LT:
+                String name = be.getLhs().toString();
+                String value = be.getRhs().toString();
+                return String.format("%s %s %s", name, opString, value);
+            case OP_AND:
+                return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs()));
+            default:
+                throw new IOException("Unsupported expression type: " + opString);
+        }
+    }
+
+    /** retrieve where clause for partition filter */
+    private String getWhereClauseForPartitionFilter()
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        return property.getProperty(PARTITION_FILTER_SIGNATURE);
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -190,6 +631,16 @@ public class CqlNativeStorage extends CqlStorage
         if (whereClause != null)
             CqlConfigHelper.setInputWhereClauses(conf, whereClause);
 
+        String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
+        String wc = whereClause != null && !whereClause.trim().isEmpty() 
+                               ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
+                               : whereClauseForPartitionFilter;
+
+        if (wc != null)
+        {
+            logger.debug("where clause: {}", wc);
+            CqlConfigHelper.setInputWhereClauses(conf, wc);
+        } 
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
@@ -212,6 +663,44 @@ public class CqlNativeStorage extends CqlStorage
         initSchema(loadSignature);
     }
 
+    /** set store configuration settings */
+    public void setStoreLocation(String location, Job job) throws IOException
+    {
+        conf = HadoopCompat.getConfiguration(job);
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
+        if (rpcPort != null)
+        {
+            ConfigHelper.setOutputRpcPort(conf, rpcPort);
+            ConfigHelper.setInputRpcPort(conf, rpcPort);
+        }
+        if (initHostAddress != null)
+        {
+            ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
+            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+        }
+
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
+        CqlConfigHelper.setOutputCql(conf, outputQuery);
+
+        setConnectionInformation();
+
+        if (ConfigHelper.getOutputRpcPort(conf) == 0)
+            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+
+        initSchema(storeSignature);
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -320,4 +809,11 @@ public class CqlNativeStorage extends CqlStorage
         }
     }
 
+    /**
+     * Thrift API can't handle null, so use empty byte array
+     */
+    public ByteBuffer nullToBB()
+    {
+        return ByteBuffer.wrap(new byte[0]);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/359d3bb2/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 08926fa..c7277fa 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -17,708 +17,25 @@
  */
 package org.apache.cassandra.hadoop.pig;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.Expression.OpType;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 /**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- *
- * A row from a standard CF will be returned as nested tuples: 
- * (((key1, value1), (key2, value2)), ((name1, val1), (name2, val2))).
+ * @deprecated use CqlNativeStorage instead. CqlStorage will be removed.
  */
-public class CqlStorage extends AbstractCassandraStorage
+public class CqlStorage extends CqlNativeStorage
 {
-    private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
-    private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
-    protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+    private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
 
-    protected int pageSize = 1000;
-    protected String columns;
-    protected String outputQuery;
-    protected String whereClause;
-    private boolean hasCompactValueAlias = false;
-        
     public CqlStorage()
     {
         this(1000);
+        logger.warn("CqlStorage is deprecated and will be removed in the next release, use CqlNativeStorage instead.");
     }
 
     /** @param pageSize limit number of CQL rows to fetch in a thrift request */
     public CqlStorage(int pageSize)
     {
-        super();
-        this.pageSize = pageSize;
-        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
-        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlOutputFormat";
-    }   
-
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    /** get next row */
-    public Tuple getNext() throws IOException
-    {
-        try
-        {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
-            Map<String, ByteBuffer> keys = reader.getCurrentKey();
-            Map<String, ByteBuffer> columns = reader.getCurrentValue();
-            assert keys != null && columns != null;
-
-            // add key columns to the map
-            for (Map.Entry<String,ByteBuffer> key : keys.entrySet())
-                columns.put(key.getKey(), key.getValue());
-
-            Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
-            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
-            int i = 0;
-            while (itera.hasNext())
-            {
-                ColumnDef cdef = itera.next();
-                ByteBuffer columnValue = columns.get(ByteBufferUtil.string(cdef.name.duplicate()));
-                if (columnValue != null)
-                {
-                    Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
-                    AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
-                    setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
-                }
-                else
-                    tuple.set(i, null);
-                i++;
-            }
-            return tuple;
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }
-
-    /** set the value to the position of the tuple */
-    protected void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        if (validator instanceof CollectionType)
-            setCollectionTupleValues(tuple, position, value, validator);
-        else
-           setTupleValue(tuple, position, value);
-    }
-
-    /** set the values of set/list at and after the position of the tuple */
-    private void setCollectionTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        if (validator instanceof MapType)
-        {
-            setMapTupleValues(tuple, position, value, validator);
-            return;
-        }
-        AbstractType elementValidator;
-        if (validator instanceof SetType)
-            elementValidator = ((SetType<?>) validator).getElementsType();
-        else if (validator instanceof ListType)
-            elementValidator = ((ListType<?>) validator).getElementsType();
-        else 
-            return;
-        
-        int i = 0;
-        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
-        for (Object entry : (Collection<?>) value)
-        {
-            setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
-            i++;
-        }
-        tuple.set(position, innerTuple);
-    }
-
-    /** set the values of set/list at and after the position of the tuple */
-    private void setMapTupleValues(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
-    {
-        AbstractType<?> keyValidator = ((MapType<?, ?>) validator).getKeysType();
-        AbstractType<?> valueValidator = ((MapType<?, ?>) validator).getValuesType();
-        
-        int i = 0;
-        Tuple innerTuple = TupleFactory.getInstance().newTuple(((Map<?,?>) value).size());
-        for(Map.Entry<?,?> entry :  ((Map<Object, Object>)value).entrySet())
-        {
-            Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
-            setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
-            setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
-            innerTuple.set(i, mapEntryTuple);
-            i++;
-        }
-        tuple.set(position, innerTuple);
-    }
-
-    /** convert a cql column to an object */
-    protected Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
-    {
-        // standard
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        ByteBuffer cellName = col.name().toByteBuffer();
-        if (validators.get(cellName) == null)
-            return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
-        else
-            return cassandraToObj(validators.get(cellName), col.value());
-    }
-
-    /** set read configuration settings */
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = HadoopCompat.getConfiguration(job);
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setInputPartitioner(conf, partitionerClass);
-        if (rpcPort != null)
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        if (initHostAddress != null)
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-
-        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
-        setConnectionInformation();
-
-        CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
-        if (columns != null && !columns.trim().isEmpty())
-            CqlConfigHelper.setInputColumns(conf, columns);
-
-        String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
-        String wc = whereClause != null && !whereClause.trim().isEmpty() 
-                               ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
-                               : whereClauseForPartitionFilter;
-
-        if (wc != null)
-        {
-            logger.debug("where clause: {}", wc);
-            CqlConfigHelper.setInputWhereClauses(conf, wc);
-        } 
-
-        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
-        {
-            try
-            {
-                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
-            }
-            catch (NumberFormatException e)
-            {
-                throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
-            }           
-        }
-
-        if (ConfigHelper.getInputRpcPort(conf) == 0)
-            throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-        if (loadSignature == null)
-            loadSignature = location;
-
-        initSchema(loadSignature);
-    }
-
-    /** set store configuration settings */
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = HadoopCompat.getConfiguration(job);
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
-        if (rpcPort != null)
-        {
-            ConfigHelper.setOutputRpcPort(conf, rpcPort);
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        }
-        if (initHostAddress != null)
-        {
-            ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-        }
-
-        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
-        CqlConfigHelper.setOutputCql(conf, outputQuery);
-
-        setConnectionInformation();
-
-        if (ConfigHelper.getOutputRpcPort(conf) == 0)
-            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
-        initSchema(storeSignature);
-    }
-    
-    /** schema: (value, value, value) where keys are in the front. */
-    public ResourceSchema getSchema(String location, Job job) throws IOException
-    {
-        setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
-        // top-level schema, no type
-        ResourceSchema schema = new ResourceSchema();
-
-        // get default marshallers and validators
-        Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
-
-        // will contain all fields for this schema
-        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
-
-        for (ColumnDef cdef : cfDef.column_metadata)
-        {
-            ResourceFieldSchema valSchema = new ResourceFieldSchema();
-            AbstractType validator = validators.get(cdef.name);
-            if (validator == null)
-                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-            valSchema.setName(new String(cdef.getName()));
-            valSchema.setType(getPigType(validator));
-            allSchemaFields.add(valSchema);
-        }
-
-        // top level schema contains everything
-        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
-        return schema;
-    }
-
-    public void setPartitionFilter(Expression partitionFilter) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
-    }
-
-    /** retrieve where clause for partition filter */
-    private String getWhereClauseForPartitionFilter()
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        return property.getProperty(PARTITION_FILTER_SIGNATURE);
-    }
-    
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-
-    /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
-    public void putNext(Tuple t) throws IOException
-    {
-        if (t.size() < 1)
-        {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-            return;
-        }
-
-        if (t.getType(0) == DataType.TUPLE)
-        {
-            if (t.getType(1) == DataType.TUPLE)
-            {
-                Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-                cqlQueryFromTuple(key, t, 1);
-            }
-            else
-                throw new IOException("Second argument in output must be a tuple");
-        }
-        else
-            throw new IOException("First argument in output must be a tuple");
-    }
-
-    /** convert key tuple to key map */
-    private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
-    {
-        Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
-        for (int i = 0; i < t.size(); i++)
-        {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() == 2)
-                {
-                    Object name = inner.get(0);
-                    if (name != null)
-                    {
-                        keys.put(name.toString(), objToBB(inner.get(1)));
-                    }
-                    else
-                        throw new IOException("Key name was empty");
-                }
-                else
-                    throw new IOException("Keys were not in name and value pairs");
-            }
-            else
-            {
-                throw new IOException("keys was not a tuple");
-            }
-        }
-        return keys;
-    }
-
-    /** send CQL query request using data from tuple */
-    private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
-    {
-        for (int i = offset; i < t.size(); i++)
-        {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() > 0)
-                {
-                    
-                    List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
-                    if (bindedVariables.size() > 0)
-                        sendCqlQuery(key, bindedVariables);
-                    else
-                        throw new IOException("Missing binded variables");
-                }
-            }
-            else
-            {
-                throw new IOException("Output type was not a tuple");
-            }
-        }
-    }
-
-    /** compose a list of binded variables */
-    private List<ByteBuffer> bindedVariablesFromTuple(Tuple t) throws IOException
-    {
-        List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
-        for (int i = 0; i < t.size(); i++)
-            variables.add(objToBB(t.get(i)));
-        return variables;
-    }
-
-    /** writer write the data by executing CQL query */
-    private void sendCqlQuery(Map<String, ByteBuffer> key, List<ByteBuffer> bindedVariables) throws IOException
-    {
-        try
-        {
-            writer.write(key, bindedVariables);
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e);
-        }
-    }
-    
-    /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
-    {
-        List<ColumnDef> keyColumns = null;
-        // get key columns
-        try
-        {
-            keyColumns = getKeysMeta(client);
-        }
-        catch(Exception e)
-        {
-            logger.error("Error in retrieving key columns" , e);   
-        }
-
-        // get other columns
-        List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
-        // combine all columns in a list
-        if (keyColumns != null && columns != null)
-            keyColumns.addAll(columns);
-
-        return keyColumns;
-    }
-
-    /** get keys meta data */
-    protected List<ColumnDef> getKeysMeta(Cassandra.Client client)
-            throws Exception
-    {
-        String query = "SELECT key_aliases, " +
-                "       column_aliases, " +
-                "       key_validator, " +
-                "       comparator, " +
-                "       keyspace_name, " +
-                "       value_alias, " +
-                "       default_validator " +
-                "FROM system.schema_columnfamilies " +
-                "WHERE keyspace_name = '%s'" +
-                "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                Compression.NONE,
-                ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        List<ColumnDef> keys = new ArrayList<ColumnDef>();
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-            logger.debug("Found ksDef name: {}", name);
-            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
-            logger.debug("partition keys: {}", keyString);
-            List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
-            Iterator<String> iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-            // classic thrift tables
-            if (keys.size() == 0)
-            {
-                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
-                for (ColumnDefinition def : cfm.partitionKeyColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-                for (ColumnDefinition def : cfm.clusteringColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-            }
-
-            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
-            logger.debug("cluster keys: {}", keyString);
-            keyNames = FBUtilities.fromJsonList(keyString);
-
-            iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-
-            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: {}", validator);
-            AbstractType<?> keyValidator = parseType(validator);
-
-            Iterator<ColumnDef> keyItera = keys.iterator();
-            if (keyValidator instanceof CompositeType)
-            {
-                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
-                while (typeItera.hasNext())
-                    keyItera.next().validation_class = typeItera.next().toString();
-            }
-            else
-                keyItera.next().validation_class = keyValidator.toString();
-
-            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: {}", validator);
-
-            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
-            {
-                AbstractType<?> clusterKeyValidator = parseType(validator);
-
-                if (clusterKeyValidator instanceof CompositeType)
-                {
-                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
-                    while (keyItera.hasNext())
-                        keyItera.next().validation_class = typeItera.next().toString();
-                }
-                else
-                    keyItera.next().validation_class = clusterKeyValidator.toString();
-            }
-
-            // compact value_alias column
-            if (cqlRow.columns.get(5).value != null)
-            {
-                try
-                {
-                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: {}", compactValidator);
-                    AbstractType<?> defaultValidator = parseType(compactValidator);
-
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = cqlRow.columns.get(5).value;
-                    cDef.validation_class = defaultValidator.toString();
-                    keys.add(cDef);
-                    hasCompactValueAlias = true;
-                }
-                catch (Exception e)
-                {
-                    JVMStabilityInspector.inspectThrowable(e);
-                    // no compact column at value_alias
-                }
-            }
-
-        }
-        return keys;
-    }
-
-    /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
-     * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
-     * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
-    private void setLocationFromUri(String location) throws IOException
-    {
-        try
-        {
-            if (!location.startsWith("cql://"))
-                throw new Exception("Bad scheme: " + location);
-
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
-
-                // each page row size
-                if (urlQuery.containsKey("page_size"))
-                    pageSize = Integer.parseInt(urlQuery.get("page_size"));
-
-                // input query select columns
-                if (urlQuery.containsKey("columns"))
-                    columns = urlQuery.get("columns");
-
-                // output prepared statement
-                if (urlQuery.containsKey("output_query"))
-                    outputQuery = urlQuery.get("output_query");
-
-                // user defined where clause
-                if (urlQuery.containsKey("where_clause"))
-                    whereClause = urlQuery.get("where_clause");
-
-                //split size
-                if (urlQuery.containsKey("split_size"))
-                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
-                if (urlQuery.containsKey("partitioner"))
-                    partitionerClass = urlQuery.get("partitioner");
-                if (urlQuery.containsKey("use_secondary"))
-                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
-                if (urlQuery.containsKey("init_address"))
-                    initHostAddress = urlQuery.get("init_address");
-                if (urlQuery.containsKey("rpc_port"))
-                    rpcPort = urlQuery.get("rpc_port");
-            }
-            String[] parts = urlParts[0].split("/+");
-            String[] credentialsAndKeyspace = parts[1].split("@");
-            if (credentialsAndKeyspace.length > 1)
-            {
-                String[] credentials = credentialsAndKeyspace[0].split(":");
-                username = credentials[0];
-                password = credentials[1];
-                keyspace = credentialsAndKeyspace[1];
-            }
-            else
-            {
-                keyspace = parts[1];
-            }
-            column_family = parts[2];
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
-                    "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
-                    "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
-                    "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
-        }
-    }
-
-    /** 
-     * Return cql where clauses for the corresponding partition filter. Make sure the data format matches 
-     * Only support the following Pig data types: int, long, float, double, boolean and chararray
-     * */
-    private String partitionFilterToWhereClauseString(Expression expression) throws IOException
-    {
-        Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
-        OpType op = expression.getOpType();
-        String opString = op.toString();
-        switch (op)
-        {
-            case OP_EQ:
-                opString = " = ";
-            case OP_GE:
-            case OP_GT:
-            case OP_LE:
-            case OP_LT:
-                String name = be.getLhs().toString();
-                String value = be.getRhs().toString();
-                return String.format("%s %s %s", name, opString, value);
-            case OP_AND:
-                return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs()));
-            default:
-                throw new IOException("Unsupported expression type: " + opString);
-        }
-    }
-
-    private Object cassandraToPigData(Object obj, AbstractType validator)
-    {
-        if (validator instanceof DecimalType || validator instanceof InetAddressType)
-            return validator.getString(validator.decompose(obj));
-        return obj;
-    }
-
-    /**
-     * Thrift API can't handle null, so use empty byte array
-     */
-    public ByteBuffer nullToBB()
-    {
-        return ByteBuffer.wrap(new byte[0]);
+        super(pageSize);
     }
 }