You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/06/08 21:43:41 UTC

[2/4] cassandra git commit: Remove deprecated legacy Hadoop code

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
deleted file mode 100644
index 7bf43ef..0000000
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ /dev/null
@@ -1,1397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop.pig;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.serializers.CollectionSerializer;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-
-/**
- * A LoadStoreFunc for retrieving data from and storing data to Cassandra
- *
- * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
- */
-@Deprecated
-public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
-{
-    public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
-    public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
-    public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
-
-    private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-    private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class);
-
-    private ByteBuffer slice_start = BOUND;
-    private ByteBuffer slice_end = BOUND;
-    private boolean slice_reverse = false;
-    private boolean allow_deletes = false;
-
-    private RecordReader<ByteBuffer, Map<ByteBuffer, ColumnFamilyRecordReader.Column>> reader;
-    private RecordWriter<ByteBuffer, List<Mutation>> writer;
-
-    private boolean widerows = false;
-    private int limit;
-
-    protected String DEFAULT_INPUT_FORMAT;
-    protected String DEFAULT_OUTPUT_FORMAT;
-
-    protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
-
-    protected String username;
-    protected String password;
-    protected String keyspace;
-    protected String column_family;
-    protected String loadSignature;
-    protected String storeSignature;
-
-    protected Configuration conf;
-    protected String inputFormatClass;
-    protected String outputFormatClass;
-    protected int splitSize = 64 * 1024;
-    protected String partitionerClass;
-    protected boolean usePartitionFilter = false;
-    protected String initHostAddress;
-    protected String rpcPort;
-    protected int nativeProtocolVersion = 1;
-    
-    // wide row hacks
-    private ByteBuffer lastKey;
-    private Map<ByteBuffer, ColumnFamilyRecordReader.Column> lastRow;
-
-    public CassandraStorage()
-    {
-        this(1024);
-    }
-
-    /**@param limit number of columns to fetch in a slice */
-    public CassandraStorage(int limit)
-    {
-        super();
-        this.limit = limit;
-        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat";
-        DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat";
-    }
-
-    public int getLimit()
-    {
-        return limit;
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    {
-        this.reader = reader;
-    }
-
-    /** read wide row*/
-    public Tuple getNextWide() throws IOException
-    {
-        CfDef cfDef = getCfDef(loadSignature);
-        ByteBuffer key = null;
-        Tuple tuple = null; 
-        DefaultDataBag bag = new DefaultDataBag();
-        try
-        {
-            while(true)
-            {
-                boolean hasNext = reader.nextKeyValue();
-                if (!hasNext)
-                {
-                    if (tuple == null)
-                        tuple = TupleFactory.getInstance().newTuple();
-
-                    if (lastRow != null)
-                    {
-                        if (tuple.size() == 0) // lastRow is a new one
-                        {
-                            key = reader.getCurrentKey();
-                            tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
-                        }
-                        for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
-                        {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
-                        }
-                        lastKey = null;
-                        lastRow = null;
-                        tuple.append(bag);
-                        return tuple;
-                    }
-                    else
-                    {
-                        if (tuple.size() == 1) // rare case of just one wide row, key already set
-                        {
-                            tuple.append(bag);
-                            return tuple;
-                        }
-                        else
-                            return null;
-                    }
-                }
-                if (key != null && !(reader.getCurrentKey()).equals(key)) // key changed
-                {
-                    // read too much, hold on to it for next time
-                    lastKey = reader.getCurrentKey();
-                    lastRow = reader.getCurrentValue();
-                    // but return what we have so far
-                    tuple.append(bag);
-                    return tuple;
-                }
-                if (key == null) // only set the key on the first iteration
-                {
-                    key = reader.getCurrentKey();
-                    if (lastKey != null && !(key.equals(lastKey))) // last key only had one value
-                    {
-                        if (tuple == null)
-                            tuple = keyToTuple(lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
-                        else
-                            addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
-                        for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
-                        {
-                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
-                        }
-                        tuple.append(bag);
-                        lastKey = key;
-                        lastRow = reader.getCurrentValue();
-                        return tuple;
-                    }
-                    if (tuple == null)
-                        tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
-                    else
-                        addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
-                }
-                SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column> row =
-                    (SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>)reader.getCurrentValue();
-                if (lastRow != null) // prepend what was read last time
-                {
-                    for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : lastRow.entrySet())
-                    {
-                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
-                    }
-                    lastKey = null;
-                    lastRow = null;
-                }
-                for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : row.entrySet())
-                {
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
-                }
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }
-
-    /** read next row */
-    public Tuple getNext() throws IOException
-    {
-        if (widerows)
-            return getNextWide();
-        try
-        {
-            // load the next pair
-            if (!reader.nextKeyValue())
-                return null;
-
-            CfDef cfDef = getCfDef(loadSignature);
-            ByteBuffer key = reader.getCurrentKey();
-            Map<ByteBuffer, ColumnFamilyRecordReader.Column> cf = reader.getCurrentValue();
-            assert key != null && cf != null;
-
-            // output tuple, will hold the key, each indexed column in a tuple, then a bag of the rest
-            // NOTE: we're setting the tuple size here only for the key so we can use setTupleValue on it
-
-            Tuple tuple = keyToTuple(key, cfDef, parseType(cfDef.getKey_validation_class()));
-            DefaultDataBag bag = new DefaultDataBag();
-            // we must add all the indexed columns first to match the schema
-            Map<ByteBuffer, Boolean> added = new HashMap<ByteBuffer, Boolean>(cfDef.column_metadata.size());
-            // take care to iterate these in the same order as the schema does
-            for (ColumnDef cdef : cfDef.column_metadata)
-            {
-                boolean hasColumn = false;
-                boolean cql3Table = false;
-                try
-                {
-                    hasColumn = cf.containsKey(cdef.name);
-                }
-                catch (Exception e)
-                {
-                    cql3Table = true;
-                }
-                if (hasColumn)
-                {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
-                }
-                else if (!cql3Table)
-                {   // otherwise, we need to add an empty tuple to take its place
-                    tuple.append(TupleFactory.getInstance().newTuple());
-                }
-                added.put(cdef.name, true);
-            }
-            // now add all the other columns
-            for (Map.Entry<ByteBuffer, ColumnFamilyRecordReader.Column> entry : cf.entrySet())
-            {
-                if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
-            }
-            tuple.append(bag);
-            // finally, special top-level indexes if needed
-            if (usePartitionFilter)
-            {
-                for (ColumnDef cdef : getIndexes())
-                {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
-                    tuple.append(throwaway.get(1));
-                }
-            }
-            return tuple;
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e.getMessage());
-        }
-    }
-
-    /** write next row */
-    public void putNext(Tuple t) throws IOException
-    {
-        /*
-        We support two cases for output:
-        First, the original output:
-            (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
-        For supers, we only accept the original output.
-        */
-
-        if (t.size() < 1)
-        {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-            return;
-        }
-        ByteBuffer key = objToBB(t.get(0));
-        if (t.getType(1) == DataType.TUPLE)
-            writeColumnsFromTuple(key, t, 1);
-        else if (t.getType(1) == DataType.BAG)
-        {
-            if (t.size() > 2)
-                throw new IOException("No arguments allowed after bag");
-            writeColumnsFromBag(key, (DataBag) t.get(1));
-        }
-        else
-            throw new IOException("Second argument in output must be a tuple or bag");
-    }
-
-    /** set hadoop cassandra connection settings */
-    protected void setConnectionInformation() throws IOException
-    {
-        StorageHelper.setConnectionInformation(conf);
-        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
-            inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
-        else
-            inputFormatClass = DEFAULT_INPUT_FORMAT;
-        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
-            outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
-        else
-            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
-        if (System.getenv(PIG_ALLOW_DELETES) != null)
-            allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
-    }
-
-    /** get the full class name */
-    protected String getFullyQualifiedClassName(String classname)
-    {
-        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
-    }
-
-    /** set read configuration settings */
-    public void setLocation(String location, Job job) throws IOException
-    {
-        conf = HadoopCompat.getConfiguration(job);
-        setLocationFromUri(location);
-
-        if (ConfigHelper.getInputSlicePredicate(conf) == null)
-        {
-            SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
-            SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
-            ConfigHelper.setInputSlicePredicate(conf, predicate);
-        }
-        if (System.getenv(PIG_WIDEROW_INPUT) != null)
-            widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
-        if (System.getenv(PIG_USE_SECONDARY) != null)
-            usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
-        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
-        {
-            try
-            {
-                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
-            }
-            catch (NumberFormatException e)
-            {
-                throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
-            }           
-        } 
-
-        if (usePartitionFilter && getIndexExpressions() != null)
-            ConfigHelper.setInputRange(conf, getIndexExpressions());
-
-        if (username != null && password != null)
-            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
-
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setInputPartitioner(conf, partitionerClass);
-        if (rpcPort != null)
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        if (initHostAddress != null)
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-
-        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
-        setConnectionInformation();
-
-        if (ConfigHelper.getInputRpcPort(conf) == 0)
-            throw new IOException("PIG_INPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-        if (loadSignature == null)
-            loadSignature = location;
-        initSchema(loadSignature);
-    }
-
-    /** set store configuration settings */
-    public void setStoreLocation(String location, Job job) throws IOException
-    {
-        conf = HadoopCompat.getConfiguration(job);
-        
-        // don't combine mappers to a single mapper per node
-        conf.setBoolean("pig.noSplitCombination", true);
-        setLocationFromUri(location);
-
-        if (username != null && password != null)
-            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
-        if (splitSize > 0)
-            ConfigHelper.setInputSplitSize(conf, splitSize);
-        if (partitionerClass!= null)
-            ConfigHelper.setOutputPartitioner(conf, partitionerClass);
-        if (rpcPort != null)
-        {
-            ConfigHelper.setOutputRpcPort(conf, rpcPort);
-            ConfigHelper.setInputRpcPort(conf, rpcPort);
-        }
-        if (initHostAddress != null)
-        {
-            ConfigHelper.setOutputInitialAddress(conf, initHostAddress);
-            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
-        }
-
-        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
-        setConnectionInformation();
-
-        if (ConfigHelper.getOutputRpcPort(conf) == 0)
-            throw new IOException("PIG_OUTPUT_RPC_PORT or PIG_RPC_PORT environment variable not set");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new IOException("PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new IOException("PIG_OUTPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
-
-        // we have to do this again here for the check in writeColumnsFromTuple
-        if (System.getenv(PIG_USE_SECONDARY) != null)
-            usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
-
-        initSchema(storeSignature);
-    }
-
-    /** Methods to get the column family schema from Cassandra */
-    protected void initSchema(String signature) throws IOException
-    {
-        Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
-
-        // Only get the schema if we haven't already gotten it
-        if (!properties.containsKey(signature))
-        {
-            try
-            {
-                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-                client.set_keyspace(keyspace);
-
-                if (username != null && password != null)
-                {
-                    Map<String, String> credentials = new HashMap<String, String>(2);
-                    credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
-                    credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
-
-                    try
-                    {
-                        client.login(new AuthenticationRequest(credentials));
-                    }
-                    catch (AuthenticationException e)
-                    {
-                        logger.error("Authentication exception: invalid username and/or password");
-                        throw new IOException(e);
-                    }
-                }
-
-                // compose the CfDef for the columfamily
-                CfDef cfDef = getCfDef(client);
-
-                if (cfDef != null)
-                {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append(cfdefToString(cfDef));
-                    properties.setProperty(signature, sb.toString());
-                }
-                else
-                    throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
-                            column_family,
-                            keyspace));
-            }
-            catch (Exception e)
-            {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    public void checkSchema(ResourceSchema schema) throws IOException
-    {
-        // we don't care about types, they all get casted to ByteBuffers
-    }
-
-    /** define the schema */
-    public ResourceSchema getSchema(String location, Job job) throws IOException
-    {
-        setLocation(location, job);
-        CfDef cfDef = getCfDef(loadSignature);
-        if (cfDef.column_type.equals("Super"))
-            return null;
-        /*
-        Our returned schema should look like this:
-        (key, index1:(name, value), index2:(name, value), columns:{(name, value)})
-        Which is to say, columns that have metadata will be returned as named tuples, but unknown columns will go into a bag.
-        This way, wide rows can still be handled by the bag, but known columns can easily be referenced.
-         */
-
-        // top-level schema, no type
-        ResourceSchema schema = new ResourceSchema();
-
-        // get default marshallers and validators
-        Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-
-        // add key
-        ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
-        keyFieldSchema.setName("key");
-        keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
-
-        ResourceSchema bagSchema = new ResourceSchema();
-        ResourceFieldSchema bagField = new ResourceFieldSchema();
-        bagField.setType(DataType.BAG);
-        bagField.setName("columns");
-        // inside the bag, place one tuple with the default comparator/validator schema
-        ResourceSchema bagTupleSchema = new ResourceSchema();
-        ResourceFieldSchema bagTupleField = new ResourceFieldSchema();
-        bagTupleField.setType(DataType.TUPLE);
-        ResourceFieldSchema bagcolSchema = new ResourceFieldSchema();
-        ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
-        bagcolSchema.setName("name");
-        bagvalSchema.setName("value");
-        bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-        bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
-        bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
-        bagTupleField.setSchema(bagTupleSchema);
-        bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
-        bagField.setSchema(bagSchema);
-
-        // will contain all fields for this schema
-        List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
-        // add the key first, then the indexed columns, and finally the bag
-        allSchemaFields.add(keyFieldSchema);
-
-        if (!widerows)
-        {
-            // defined validators/indexes
-            for (ColumnDef cdef : cfDef.column_metadata)
-            {
-                // make a new tuple for each col/val pair
-                ResourceSchema innerTupleSchema = new ResourceSchema();
-                ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
-                innerTupleField.setType(DataType.TUPLE);
-                innerTupleField.setSchema(innerTupleSchema);
-                innerTupleField.setName(new String(cdef.getName()));
-
-                ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
-                idxColSchema.setName("name");
-                idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-
-                ResourceFieldSchema valSchema = new ResourceFieldSchema();
-                AbstractType validator = validators.get(cdef.name);
-                if (validator == null)
-                    validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-                valSchema.setName("value");
-                valSchema.setType(StorageHelper.getPigType(validator));
-
-                innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
-                allSchemaFields.add(innerTupleField);
-            }   
-        }
-
-        // bag at the end for unknown columns
-        allSchemaFields.add(bagField);
-
-        // add top-level index elements if needed
-        if (usePartitionFilter)
-        {
-            for (ColumnDef cdef : getIndexes())
-            {
-                ResourceFieldSchema idxSchema = new ResourceFieldSchema();
-                idxSchema.setName("index_" + new String(cdef.getName()));
-                AbstractType validator = validators.get(cdef.name);
-                if (validator == null)
-                    validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-                idxSchema.setType(StorageHelper.getPigType(validator));
-                allSchemaFields.add(idxSchema);
-            }
-        }
-        // top level schema contains everything
-        schema.setFields(allSchemaFields.toArray(new ResourceFieldSchema[allSchemaFields.size()]));
-        return schema;
-    }
-
-    /** set partition filter */
-    public void setPartitionFilter(Expression partitionFilter) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
-        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
-    }
-
-    /** prepare writer */
-    public void prepareToWrite(RecordWriter writer)
-    {
-        this.writer = writer;
-    }
-
-    /** convert object to ByteBuffer */
-    protected ByteBuffer objToBB(Object o)
-    {
-        if (o == null)
-            return nullToBB();
-        if (o instanceof java.lang.String)
-            return ByteBuffer.wrap(new DataByteArray((String)o).get());
-        if (o instanceof Integer)
-            return Int32Type.instance.decompose((Integer)o);
-        if (o instanceof Long)
-            return LongType.instance.decompose((Long)o);
-        if (o instanceof Float)
-            return FloatType.instance.decompose((Float)o);
-        if (o instanceof Double)
-            return DoubleType.instance.decompose((Double)o);
-        if (o instanceof UUID)
-            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
-        if(o instanceof Tuple) {
-            List<Object> objects = ((Tuple)o).getAll();
-            //collections
-            if (objects.size() > 0 && objects.get(0) instanceof String)
-            {
-                String collectionType = (String) objects.get(0);
-                if ("set".equalsIgnoreCase(collectionType) ||
-                        "list".equalsIgnoreCase(collectionType))
-                    return objToListOrSetBB(objects.subList(1, objects.size()));
-                else if ("map".equalsIgnoreCase(collectionType))
-                    return objToMapBB(objects.subList(1, objects.size()));
-
-            }
-            return objToCompositeBB(objects);
-        }
-
-        return ByteBuffer.wrap(((DataByteArray) o).get());
-    }
-
-    private ByteBuffer objToListOrSetBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-        }
-        // NOTE: using protocol v1 serialization format for collections so as to not break
-        // compatibility. Not sure if that's the right thing.
-        return CollectionSerializer.pack(serialized, objects.size(), 1);
-    }
-
-    private ByteBuffer objToMapBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
-        for(Object sub : objects)
-        {
-            List<Object> keyValue = ((Tuple)sub).getAll();
-            for (Object entry: keyValue)
-            {
-                ByteBuffer buffer = objToBB(entry);
-                serialized.add(buffer);
-            }
-        }
-        // NOTE: using protocol v1 serialization format for collections so as to not break
-        // compatibility. Not sure if that's the right thing.
-        return CollectionSerializer.pack(serialized, objects.size(), 1);
-    }
-
-    private ByteBuffer objToCompositeBB(List<Object> objects)
-    {
-        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
-        int totalLength = 0;
-        for(Object sub : objects)
-        {
-            ByteBuffer buffer = objToBB(sub);
-            serialized.add(buffer);
-            totalLength += 2 + buffer.remaining() + 1;
-        }
-        ByteBuffer out = ByteBuffer.allocate(totalLength);
-        for (ByteBuffer bb : serialized)
-        {
-            int length = bb.remaining();
-            out.put((byte) ((length >> 8) & 0xFF));
-            out.put((byte) (length & 0xFF));
-            out.put(bb);
-            out.put((byte) 0);
-        }
-        out.flip();
-        return out;
-    }
-
-    /** write tuple data to cassandra */
-    private void writeColumnsFromTuple(ByteBuffer key, Tuple t, int offset) throws IOException
-    {
-        ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-        for (int i = offset; i < t.size(); i++)
-        {
-            if (t.getType(i) == DataType.BAG)
-                writeColumnsFromBag(key, (DataBag) t.get(i));
-            else if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() > 0) // may be empty, for an indexed column that wasn't present
-                    mutationList.add(mutationFromTuple(inner));
-            }
-            else if (!usePartitionFilter)
-            {
-                throw new IOException("Output type was not a bag or a tuple");
-            }
-        }
-        if (mutationList.size() > 0)
-            writeMutations(key, mutationList);
-    }
-
-    /** compose Cassandra mutation from tuple */
-    private Mutation mutationFromTuple(Tuple t) throws IOException
-    {
-        Mutation mutation = new Mutation();
-        if (t.get(1) == null)
-        {
-            if (allow_deletes)
-            {
-                mutation.deletion = new Deletion();
-                mutation.deletion.predicate = new org.apache.cassandra.thrift.SlicePredicate();
-                mutation.deletion.predicate.column_names = Arrays.asList(objToBB(t.get(0)));
-                mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
-            }
-            else
-                throw new IOException("null found but deletes are disabled, set " + PIG_ALLOW_DELETES +
-                    "=true in environment or allow_deletes=true in URL to enable");
-        }
-        else
-        {
-            org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
-            column.setName(objToBB(t.get(0)));
-            column.setValue(objToBB(t.get(1)));
-            column.setTimestamp(FBUtilities.timestampMicros());
-            mutation.column_or_supercolumn = new ColumnOrSuperColumn();
-            mutation.column_or_supercolumn.column = column;
-        }
-        return mutation;
-    }
-
-    /** write bag data to Cassandra */
-    private void writeColumnsFromBag(ByteBuffer key, DataBag bag) throws IOException
-    {
-        List<Mutation> mutationList = new ArrayList<Mutation>();
-        for (Tuple pair : bag)
-        {
-            Mutation mutation = new Mutation();
-            if (DataType.findType(pair.get(1)) == DataType.BAG) // supercolumn
-            {
-                SuperColumn sc = new SuperColumn();
-                sc.setName(objToBB(pair.get(0)));
-                List<org.apache.cassandra.thrift.Column> columns = new ArrayList<org.apache.cassandra.thrift.Column>();
-                for (Tuple subcol : (DataBag) pair.get(1))
-                {
-                    org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column();
-                    column.setName(objToBB(subcol.get(0)));
-                    column.setValue(objToBB(subcol.get(1)));
-                    column.setTimestamp(FBUtilities.timestampMicros());
-                    columns.add(column);
-                }
-                if (columns.isEmpty())
-                {
-                    if (allow_deletes)
-                    {
-                        mutation.deletion = new Deletion();
-                        mutation.deletion.super_column = objToBB(pair.get(0));
-                        mutation.deletion.setTimestamp(FBUtilities.timestampMicros());
-                    }
-                    else
-                        throw new IOException("SuperColumn deletion attempted with empty bag, but deletes are disabled, set " +
-                            PIG_ALLOW_DELETES + "=true in environment or allow_deletes=true in URL to enable");
-                }
-                else
-                {
-                    sc.columns = columns;
-                    mutation.column_or_supercolumn = new ColumnOrSuperColumn();
-                    mutation.column_or_supercolumn.super_column = sc;
-                }
-            }
-            else
-                mutation = mutationFromTuple(pair);
-            mutationList.add(mutation);
-            // for wide rows, we need to limit the amount of mutations we write at once
-            if (mutationList.size() >= 10) // arbitrary, CFOF will re-batch this up, and BOF won't care
-            {
-                writeMutations(key, mutationList);
-                mutationList.clear();
-            }
-        }
-        // write the last batch
-        if (mutationList.size() > 0)
-            writeMutations(key, mutationList);
-    }
-
-    /** write mutation to Cassandra */
-    private void writeMutations(ByteBuffer key, List<Mutation> mutations) throws IOException
-    {
-        try
-        {
-            writer.write(key, mutations);
-        }
-        catch (InterruptedException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** get a list of columns with defined index*/
-    protected List<ColumnDef> getIndexes() throws IOException
-    {
-        CfDef cfdef = getCfDef(loadSignature);
-        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
-        for (ColumnDef cdef : cfdef.column_metadata)
-        {
-            if (cdef.index_type != null)
-                indexes.add(cdef);
-        }
-        return indexes;
-    }
-
-    /** get a list of Cassandra IndexExpression from Pig expression */
-    private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException
-    {
-        List<IndexExpression> indexExpressions = new ArrayList<IndexExpression>();
-        Expression.BinaryExpression be = (Expression.BinaryExpression)expression;
-        ByteBuffer name = ByteBuffer.wrap(be.getLhs().toString().getBytes());
-        ByteBuffer value = ByteBuffer.wrap(be.getRhs().toString().getBytes());
-        switch (expression.getOpType())
-        {
-            case OP_EQ:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.EQ, value));
-                break;
-            case OP_GE:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.GTE, value));
-                break;
-            case OP_GT:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.GT, value));
-                break;
-            case OP_LE:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.LTE, value));
-                break;
-            case OP_LT:
-                indexExpressions.add(new IndexExpression(name, IndexOperator.LT, value));
-                break;
-            case OP_AND:
-                indexExpressions.addAll(filterToIndexExpressions(be.getLhs()));
-                indexExpressions.addAll(filterToIndexExpressions(be.getRhs()));
-                break;
-            default:
-                throw new IOException("Unsupported expression type: " + expression.getOpType().name());
-        }
-        return indexExpressions;
-    }
-
-    /** convert a list of index expression to string */
-    private static String indexExpressionsToString(List<IndexExpression> indexExpressions) throws IOException
-    {
-        assert indexExpressions != null;
-        // oh, you thought cfdefToString was awful?
-        IndexClause indexClause = new IndexClause();
-        indexClause.setExpressions(indexExpressions);
-        indexClause.setStart_key("".getBytes());
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            return Hex.bytesToHex(serializer.serialize(indexClause));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** convert string to a list of index expression */
-    private static List<IndexExpression> indexExpressionsFromString(String ie) throws IOException
-    {
-        assert ie != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        IndexClause indexClause = new IndexClause();
-        try
-        {
-            deserializer.deserialize(indexClause, Hex.hexToBytes(ie));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-        return indexClause.getExpressions();
-    }
-
-    public ResourceStatistics getStatistics(String location, Job job)
-    {
-        return null;
-    }
-
-    public void cleanupOnFailure(String failure, Job job)
-    {
-    }
-
-    public void cleanupOnSuccess(String location, Job job) throws IOException {
-    }
-
-
-    /** StoreFunc methods */
-    public void setStoreFuncUDFContextSignature(String signature)
-    {
-        this.storeSignature = signature;
-    }
-
-    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
-    {
-        return relativeToAbsolutePath(location, curDir);
-    }
-
-    /** output format */
-    public OutputFormat getOutputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(outputFormatClass, "outputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-
-    @Override
-    public InputFormat getInputFormat() throws IOException
-    {
-        try
-        {
-            return FBUtilities.construct(inputFormatClass, "inputformat");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** get a list of index expression */
-    private List<IndexExpression> getIndexExpressions() throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
-        if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null)
-            return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
-        else
-            return null;
-    }
-
-    /** get a list of column for the column family */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
-    throws TException, CharacterCodingException, InvalidRequestException, ConfigurationException
-    {   
-        return getColumnMeta(client, true, true);
-    }
-
-
-    /** get column meta data */
-    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
-            throws TException,
-            CharacterCodingException,
-            ConfigurationException
-    {
-        String query = String.format("SELECT column_name, validator, index_type, type " +
-                        "FROM %s.%s " +
-                        "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                SystemKeyspace.NAME,
-                LegacySchemaTables.COLUMNS,
-                keyspace,
-                column_family);
-
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-        List<CqlRow> rows = result.rows;
-        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
-        if (rows == null || rows.isEmpty())
-        {
-            // if CassandraStorage, just return the empty list
-            if (cassandraStorage)
-                return columnDefs;
-
-            // otherwise for CqlNativeStorage, check metadata for classic thrift tables
-            CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
-            for (ColumnDefinition def : cfm.regularAndStaticColumns())
-            {
-                ColumnDef cDef = new ColumnDef();
-                String columnName = def.name.toString();
-                String type = def.type.toString();
-                logger.debug("name: {}, type: {} ", columnName, type);
-                cDef.name = ByteBufferUtil.bytes(columnName);
-                cDef.validation_class = type;
-                columnDefs.add(cDef);
-            }
-            // we may not need to include the value column for compact tables as we
-            // could have already processed it as schema_columnfamilies.value_alias
-            if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
-            {
-                ColumnDefinition def = cfm.compactValueColumn();
-                if ("value".equals(def.name.toString()))
-                {
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = def.name.bytes;
-                    cDef.validation_class = def.type.toString();
-                    columnDefs.add(cDef);
-                }
-            }
-            return columnDefs;
-        }
-
-        Iterator<CqlRow> iterator = rows.iterator();
-        while (iterator.hasNext())
-        {
-            CqlRow row = iterator.next();
-            ColumnDef cDef = new ColumnDef();
-            String type = ByteBufferUtil.string(row.getColumns().get(3).value);
-            if (!type.equals("regular"))
-                continue;
-            cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
-            cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
-            ByteBuffer indexType = row.getColumns().get(2).value;
-            if (indexType != null)
-                cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
-            columnDefs.add(cDef);
-        }
-        return columnDefs;
-    }
-
-
-    /** get CFMetaData of a column family */
-    protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
-            throws TException, ConfigurationException
-    {
-        KsDef ksDef = client.describe_keyspace(ks);
-        for (CfDef cfDef : ksDef.cf_defs)
-        {
-            if (cfDef.name.equalsIgnoreCase(cf))
-                return ThriftConversion.fromThrift(cfDef);
-        }
-        return null;
-    }
-
-    /** get index type from string */
-    protected IndexType getIndexType(String type)
-    {
-        type = type.toLowerCase();
-        if ("keys".equals(type))
-            return IndexType.KEYS;
-        else if("custom".equals(type))
-            return IndexType.CUSTOM;
-        else if("composites".equals(type))
-            return IndexType.COMPOSITES;
-        else
-            return null;
-    }
-
-    /** return partition keys */
-    public String[] getPartitionKeys(String location, Job job) throws IOException
-    {
-        if (!usePartitionFilter)
-            return null;
-        List<ColumnDef> indexes = getIndexes();
-        String[] partitionKeys = new String[indexes.size()];
-        for (int i = 0; i < indexes.size(); i++)
-        {
-            partitionKeys[i] = new String(indexes.get(i).getName());
-        }
-        return partitionKeys;
-    }
-
-    /** convert key to a tuple */
-    private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        Tuple tuple = TupleFactory.getInstance().newTuple(1);
-        addKeyToTuple(tuple, key, cfDef, comparator);
-        return tuple;
-    }
-
-    /** add key to a tuple */
-    private void addKeyToTuple(Tuple tuple, ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        if( comparator instanceof AbstractCompositeType )
-        {
-            StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key));
-        }
-        else
-        {
-            StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion));
-        }
-
-    }
-
-    /** Deconstructs a composite type to a Tuple. */
-    protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
-    {
-        List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name);
-        Tuple t = TupleFactory.getInstance().newTuple(result.size());
-        for (int i=0; i<result.size(); i++)
-            StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion));
-
-        return t;
-    }
-
-    /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
-     * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
-     * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
-    private void setLocationFromUri(String location) throws IOException
-    {
-        try
-        {
-            if (!location.startsWith("cassandra://"))
-                throw new Exception("Bad scheme." + location);
-            
-            String[] urlParts = location.split("\\?");
-            if (urlParts.length > 1)
-            {
-                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
-                AbstractType comparator = BytesType.instance;
-                if (urlQuery.containsKey("comparator"))
-                    comparator = TypeParser.parse(urlQuery.get("comparator"));
-                if (urlQuery.containsKey("slice_start"))
-                    slice_start = comparator.fromString(urlQuery.get("slice_start"));
-                if (urlQuery.containsKey("slice_end"))
-                    slice_end = comparator.fromString(urlQuery.get("slice_end"));
-                if (urlQuery.containsKey("reversed"))
-                    slice_reverse = Boolean.parseBoolean(urlQuery.get("reversed"));
-                if (urlQuery.containsKey("limit"))
-                    limit = Integer.parseInt(urlQuery.get("limit"));
-                if (urlQuery.containsKey("allow_deletes"))
-                    allow_deletes = Boolean.parseBoolean(urlQuery.get("allow_deletes"));
-                if (urlQuery.containsKey("widerows"))
-                    widerows = Boolean.parseBoolean(urlQuery.get("widerows"));
-                if (urlQuery.containsKey("use_secondary"))
-                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
-                if (urlQuery.containsKey("split_size"))
-                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
-                if (urlQuery.containsKey("partitioner"))
-                    partitionerClass = urlQuery.get("partitioner");
-                if (urlQuery.containsKey("init_address"))
-                    initHostAddress = urlQuery.get("init_address");
-                if (urlQuery.containsKey("rpc_port"))
-                    rpcPort = urlQuery.get("rpc_port");
-            }
-            String[] parts = urlParts[0].split("/+");
-            String[] credentialsAndKeyspace = parts[1].split("@");
-            if (credentialsAndKeyspace.length > 1)
-            {
-                String[] credentials = credentialsAndKeyspace[0].split(":");
-                username = credentials[0];
-                password = credentials[1];
-                keyspace = credentialsAndKeyspace[1];
-            }
-            else
-            {
-                keyspace = parts[1];
-            }
-            column_family = parts[2];
-        }
-        catch (Exception e)
-        {
-            throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<table>" +
-                    "[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]" +
-                    "[&allow_deletes=true][&widerows=true][&use_secondary=true]" +
-                    "[&comparator=<comparator>][&split_size=<size>][&partitioner=<partitioner>]" +
-                    "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
-        }
-    }
-
-
-    /** decompose the query to store the parameters in a map */
-    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
-    {
-        String[] params = query.split("&");
-        Map<String, String> map = new HashMap<String, String>(params.length);
-        for (String param : params)
-        {
-            String[] keyValue = param.split("=");
-            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
-        }
-        return map;
-    }
-
-    public ByteBuffer nullToBB()
-    {
-        return null;
-    }
-
-    /** return the CfInfo for the column family */
-    protected CfDef getCfDef(Cassandra.Client client)
-            throws TException,
-            ConfigurationException,
-            IOException
-    {
-        // get CF meta data
-        String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " +
-                        "FROM %s.%s " +
-                        "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                SystemKeyspace.NAME,
-                LegacySchemaTables.COLUMNFAMILIES,
-                keyspace,
-                column_family);
-
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        CfDef cfDef = new CfDef();
-        cfDef.keyspace = keyspace;
-        cfDef.name = column_family;
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-
-            cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
-            cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
-            ByteBuffer subComparator = cqlRow.columns.get(2).value;
-            if (subComparator != null)
-                cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
-            cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
-            cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-        }
-        cfDef.column_metadata = getColumnMetadata(client);
-        return cfDef;
-    }
-
-    /** get the columnfamily definition for the signature */
-    protected CfDef getCfDef(String signature) throws IOException
-    {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
-        String prop = property.getProperty(signature);
-        return cfdefFromString(prop);
-    }
-
-    /** convert string back to CfDef */
-    protected static CfDef cfdefFromString(String st) throws IOException
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-        CfDef cfDef = new CfDef();
-        try
-        {
-            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-        return cfDef;
-    }
-
-    /** convert CfDef to string */
-    protected static String cfdefToString(CfDef cfDef) throws IOException
-    {
-        assert cfDef != null;
-        // this is so awful it's kind of cool!
-        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
-        try
-        {
-            return Hex.bytesToHex(serializer.serialize(cfDef));
-        }
-        catch (TException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** parse the string to a cassandra data type */
-    protected AbstractType parseType(String type) throws IOException
-    {
-        try
-        {
-            // always treat counters like longs, specifically CCT.compose is not what we need
-            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
-                return LongType.instance;
-            return TypeParser.parse(type);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IOException(e);
-        }
-        catch (SyntaxException e)
-        {
-            throw new IOException(e);
-        }
-    }
-
-    /** convert a column to a tuple */
-    protected Tuple columnToTuple(ColumnFamilyRecordReader.Column column, CfDef cfDef, AbstractType comparator) throws IOException
-    {
-        Tuple pair = TupleFactory.getInstance().newTuple(2);
-
-        // name
-        if(comparator instanceof AbstractCompositeType)
-            StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, column.name));
-        else
-            StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, column.name, nativeProtocolVersion));
-
-        // value
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
-        if (validators.get(column.name) == null)
-        {
-            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), column.value, nativeProtocolVersion));
-        }
-        else
-            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(column.name), column.value, nativeProtocolVersion));
-        return pair;
-    }
-
-    /** construct a map to store the mashaller type to cassandra data type mapping */
-    protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
-    {
-        Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
-        AbstractType comparator;
-        AbstractType subcomparator;
-        AbstractType default_validator;
-        AbstractType key_validator;
-
-        comparator = parseType(cfDef.getComparator_type());
-        subcomparator = parseType(cfDef.getSubcomparator_type());
-        default_validator = parseType(cfDef.getDefault_validation_class());
-        key_validator = parseType(cfDef.getKey_validation_class());
-
-        marshallers.put(MarshallerType.COMPARATOR, comparator);
-        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
-        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
-        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
-        return marshallers;
-    }
-
-    /** get the validators */
-    protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
-    {
-        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
-        for (ColumnDef cd : cfDef.getColumn_metadata())
-        {
-            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
-            {
-                AbstractType validator = null;
-                try
-                {
-                    validator = TypeParser.parse(cd.getValidation_class());
-                    if (validator instanceof CounterColumnType)
-                        validator = LongType.instance;
-                    validators.put(cd.name, validator);
-                }
-                catch (ConfigurationException e)
-                {
-                    throw new IOException(e);
-                }
-                catch (SyntaxException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-        }
-        return validators;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index 8c27f6c..f556e66 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.pig;
 
 import java.io.IOException;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -27,9 +29,6 @@ import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.service.EmbeddedCassandraService;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.ExecType;
@@ -37,13 +36,6 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.MiniCluster;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -80,13 +72,10 @@ public class PigTestBase extends SchemaLoader
         pig.shutdown();
     }
 
-    protected static Cassandra.Client getClient() throws TTransportException
+    protected static Session getClient()
     {
-        TTransport tr = new TFramedTransport(new TSocket("localhost", 9170));
-        TProtocol proto = new TBinaryProtocol(tr);
-        Cassandra.Client client = new Cassandra.Client(proto);
-        tr.open();
-        return client;
+        Cluster cluster = Cluster.builder().addContactPoints("localhost").withPort(9042).build();
+        return cluster.connect();
     }
 
     protected static void startCassandra() throws IOException
@@ -114,14 +103,14 @@ public class PigTestBase extends SchemaLoader
         }
     }
 
-    protected static void executeCQLStatements(String[] statements) throws TException
+    protected static void executeCQLStatements(String[] statements)
     {
-        Cassandra.Client client = getClient();
+        Session client = getClient();
 
         for (String statement : statements)
         {
             System.out.println("Executing statement: " + statement);
-            client.execute_cql3_query(ByteBufferUtil.bytes(statement), Compression.NONE, ConsistencyLevel.ONE);
+            client.execute(statement);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
index 3ddb94e..273cdff 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
@@ -24,10 +24,8 @@ import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Hex;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -69,7 +67,7 @@ public class ThriftColumnFamilyDataTypeTest extends PigTestBase
     };
 
     @BeforeClass
-    public static void setup() throws IOException, ConfigurationException, TException
+    public static void setup() throws IOException, ConfigurationException
     {
         startCassandra();
         executeCQLStatements(statements);
@@ -79,76 +77,74 @@ public class ThriftColumnFamilyDataTypeTest extends PigTestBase
     @Test
     public void testCassandraStorageDataType() throws IOException
     {
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + "' USING CqlNativeStorage();");
         Tuple t = pig.openIterator("rows").next();
 
         // key
         assertEquals("foo", t.get(0));
 
         // col_ascii
-        Tuple column = (Tuple) t.get(1);
-        assertEquals("ascii", column.get(1));
+        Object column = t.get(1);
+        assertEquals("ascii", column);
 
         // col_bigint
-        column = (Tuple) t.get(2);
-        assertEquals(12345678L, column.get(1));
+        column = t.get(2);
+        assertEquals(12345678L, column);
 
         // col_blob
-        column = (Tuple) t.get(3);
-        assertEquals(new DataByteArray(Hex.hexToBytes("DEADBEEF")), column.get(1));
+        column = t.get(3);
+        assertEquals(new DataByteArray(Hex.hexToBytes("DEADBEEF")), column);
 
         // col_boolean
-        column = (Tuple) t.get(4);
-        assertEquals(false, column.get(1));
+        column = t.get(4);
+        assertEquals(false, column);
 
         // col_decimal
-        column = (Tuple) t.get(5);
-        assertEquals("23.345", column.get(1));
+        column = t.get(5);
+        assertEquals("23.345", column);
 
         // col_double
-        column = (Tuple) t.get(6);
-        assertEquals(2.7182818284590451d, column.get(1));
+        column = t.get(6);
+        assertEquals(2.7182818284590451d, column);
 
         // col_float
-        column = (Tuple) t.get(7);
-        assertEquals(23.45f, column.get(1));
+        column = t.get(7);
+        assertEquals(23.45f, column);
 
         // col_inet
-        column = (Tuple) t.get(8);
-        assertEquals("127.0.0.1", column.get(1));
+        column = t.get(8);
+        assertEquals("127.0.0.1", column);
 
         // col_int
-        column = (Tuple) t.get(9);
-        assertEquals(23, column.get(1));
+        column = t.get(9);
+        assertEquals(23, column);
 
         // col_text
-        column = (Tuple) t.get(10);
-        assertEquals("hello", column.get(1));
+        column = t.get(10);
+        assertEquals("hello", column);
 
         // col_timestamp
-        column = (Tuple) t.get(11);
-        assertEquals(1296705900000L, column.get(1));
+        column = t.get(11);
+        assertEquals(1296705900000L, column);
 
         // col_timeuuid
-        column = (Tuple) t.get(12);
-        assertEquals(new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())), column.get(1));
+        column = t.get(12);
+        assertEquals(new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())), column);
 
         // col_uuid
-        column = (Tuple) t.get(13);
-        assertEquals(new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())), column.get(1));
+        column = t.get(13);
+        assertEquals(new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())), column);
 
         // col_varint
-        column = (Tuple) t.get(14);
-        assertEquals(12345, column.get(1));
+        column = t.get(14);
+        assertEquals(12345, column);
 
-        pig.registerQuery("cc_rows = LOAD 'cassandra://thrift_ks/cc?" + defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("cc_rows = LOAD 'cql://thrift_ks/cc?" + defaultParameters + "' USING CqlNativeStorage();");
         t = pig.openIterator("cc_rows").next();
 
         assertEquals("chuck", t.get(0));
 
-        DataBag columns = (DataBag) t.get(1);
-        column = columns.iterator().next();
-        assertEquals("kick", column.get(0));
-        assertEquals(3L, column.get(1));
+        assertEquals("kick", t.get(1));
+        assertEquals(3L, t.get(2));
     }
 }