You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/07/23 17:32:07 UTC

[10/12] git commit: Pig support for hadoop CqlInputFormat

Pig support for hadoop CqlInputFormat

Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6454


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62e70d23
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62e70d23
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62e70d23

Branch: refs/heads/cassandra-2.1
Commit: 62e70d23e5bacbcbdcf736fb60a02b543c965f99
Parents: a1fe634
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jul 23 10:28:46 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jul 23 10:28:46 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   1 -
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |   2 -
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 291 +++++++++++++++++++
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  11 +-
 test/conf/cassandra.yaml                        |   2 +
 .../cassandra/pig/CqlTableDataTypeTest.java     |  94 +++++-
 .../org/apache/cassandra/pig/CqlTableTest.java  | 101 ++++++-
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +
 .../cassandra/pig/ThriftColumnFamilyTest.java   |  53 +++-
 10 files changed, 518 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 028128d..6914a84 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
  * Add listen_interface and rpc_interface options (CASSANDRA-7417)
  * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
  * Improve schema merge performance (CASSANDRA-7444)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 63279d1..b375ce2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.hadoop.cql3;
 *
 */
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.KeyManagementException;
 import java.security.KeyStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index e1cdf32..09bd80c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.hadoop.cql3;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
 
 import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
 import org.apache.hadoop.mapred.InputSplit;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
new file mode 100644
index 0000000..68249f9
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -0,0 +1,291 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.cassandra.db.BufferCell;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.datastax.driver.core.Row;
+
+public class CqlNativeStorage extends CqlStorage
+{
+    private RecordReader<Long, Row> reader;
+    private String nativePort;
+    private String nativeCoreConnections;
+    private String nativeMaxConnections;
+    private String nativeMinSimultReqs;
+    private String nativeMaxSimultReqs;
+    private String nativeConnectionTimeout;
+    private String nativeReadConnectionTimeout;
+    private String nativeReceiveBufferSize;
+    private String nativeSendBufferSize;
+    private String nativeSolinger;
+    private String nativeTcpNodelay;
+    private String nativeReuseAddress;
+    private String nativeKeepAlive;
+    private String nativeAuthProvider;
+    private String nativeSSLTruststorePath;
+    private String nativeSSLKeystorePath;
+    private String nativeSSLTruststorePassword;
+    private String nativeSSLKeystorePassword;
+    private String nativeSSLCipherSuites;
+    private String inputCql;
+
+    public CqlNativeStorage()
+    {
+        this(1000);
+    }
+
+    /** @param pageSize limit number of CQL rows to fetch in a thrift request */
+    public CqlNativeStorage(int pageSize)
+    {
+        super(pageSize);
+        DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+    }
+
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    {
+        this.reader = reader;
+    }
+
+    /** get next row */
+    public Tuple getNext() throws IOException
+    {
+        try
+        {
+            // load the next pair
+            if (!reader.nextKeyValue())
+                return null;
+
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
+            Row row = reader.getCurrentValue();
+            Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+            int i = 0;
+            while (itera.hasNext())
+            {
+                ColumnDef cdef = itera.next();
+                ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+                if (columnValue != null)
+                {
+                    Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
+                    AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
+                    setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
+                }
+                else
+                    tuple.set(i, null);
+                i++;
+            }
+            return tuple;
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
+
+    /** set read configuration settings */
+    public void setLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+        {
+            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+            CqlConfigHelper.setUserNameAndPassword(conf, username, password);
+        }
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setInputPartitioner(conf, partitionerClass);
+        if (initHostAddress != null)
+            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+        if (rpcPort != null)
+            ConfigHelper.setInputRpcPort(conf, rpcPort);
+        if (nativePort != null)
+            CqlConfigHelper.setInputNativePort(conf, nativePort);
+        if (nativeCoreConnections != null)
+            CqlConfigHelper.setInputCoreConnections(conf, nativeCoreConnections);
+        if (nativeMaxConnections != null)
+            CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections);
+        if (nativeMinSimultReqs != null)
+            CqlConfigHelper.setInputMinSimultReqPerConnections(conf, nativeMinSimultReqs);
+        if (nativeMinSimultReqs != null)
+            CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, nativeMaxSimultReqs);
+        if (nativeConnectionTimeout != null)
+            CqlConfigHelper.setInputNativeConnectionTimeout(conf, nativeConnectionTimeout);
+        if (nativeReadConnectionTimeout != null)
+            CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, nativeReadConnectionTimeout);
+        if (nativeReceiveBufferSize != null)
+            CqlConfigHelper.setInputNativeReceiveBufferSize(conf, nativeReceiveBufferSize);
+        if (nativeSendBufferSize != null)
+            CqlConfigHelper.setInputNativeSendBufferSize(conf, nativeSendBufferSize);
+        if (nativeSolinger != null)
+            CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger);
+        if (nativeTcpNodelay != null)
+            CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay);
+        if (nativeReuseAddress != null)
+            CqlConfigHelper.setInputNativeReuseAddress(conf, nativeReuseAddress);
+        if (nativeKeepAlive != null)
+            CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive);
+        if (nativeAuthProvider != null)
+            CqlConfigHelper.setInputNativeAuthProvider(conf, nativeAuthProvider);
+        if (nativeSSLTruststorePath != null)
+            CqlConfigHelper.setInputNativeSSLTruststorePath(conf, nativeSSLTruststorePath);
+        if (nativeSSLKeystorePath != null)
+            CqlConfigHelper.setInputNativeSSLKeystorePath(conf, nativeSSLKeystorePath);
+        if (nativeSSLTruststorePassword != null)
+            CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, nativeSSLTruststorePassword);
+        if (nativeSSLKeystorePassword != null)
+            CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, nativeSSLKeystorePassword);
+        if (nativeSSLCipherSuites != null)
+            CqlConfigHelper.setInputNativeSSLCipherSuites(conf, nativeSSLCipherSuites);
+
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+        setConnectionInformation();
+
+        CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        {
+            try
+            {
+                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+            }           
+        }
+
+        if (ConfigHelper.getInputInitialAddress(conf) == null)
+            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+        if (loadSignature == null)
+            loadSignature = location;
+
+        initSchema(loadSignature);
+    }
+
+    private void setLocationFromUri(String location) throws IOException
+    {
+        try
+        {
+            if (!location.startsWith("cql://"))
+                throw new Exception("Bad scheme: " + location);
+
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+                // each page row size
+                if (urlQuery.containsKey("page_size"))
+                    pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+                // output prepared statement
+                if (urlQuery.containsKey("output_query"))
+                    outputQuery = urlQuery.get("output_query");
+
+                //split size
+                if (urlQuery.containsKey("split_size"))
+                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
+                if (urlQuery.containsKey("partitioner"))
+                    partitionerClass = urlQuery.get("partitioner");
+                if (urlQuery.containsKey("use_secondary"))
+                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+                if (urlQuery.containsKey("init_address"))
+                    initHostAddress = urlQuery.get("init_address");
+
+                if (urlQuery.containsKey("native_port"))
+                    nativePort = urlQuery.get("native_port");
+                if (urlQuery.containsKey("core_conns"))
+                    nativeCoreConnections = urlQuery.get("core_conns");
+                if (urlQuery.containsKey("max_conns"))
+                    nativeMaxConnections = urlQuery.get("max_conns");
+                if (urlQuery.containsKey("min_simult_reqs"))
+                    nativeMinSimultReqs = urlQuery.get("min_simult_reqs");
+                if (urlQuery.containsKey("max_simult_reqs"))
+                    nativeMaxSimultReqs = urlQuery.get("max_simult_reqs");
+                if (urlQuery.containsKey("native_timeout"))
+                    nativeConnectionTimeout = urlQuery.get("native_timeout");
+                if (urlQuery.containsKey("native_read_timeout"))
+                    nativeReadConnectionTimeout = urlQuery.get("native_read_timeout");
+                if (urlQuery.containsKey("rec_buff_size"))
+                    nativeReceiveBufferSize = urlQuery.get("rec_buff_size");
+                if (urlQuery.containsKey("send_buff_size"))
+                    nativeSendBufferSize = urlQuery.get("send_buff_size");
+                if (urlQuery.containsKey("solinger"))
+                    nativeSolinger = urlQuery.get("solinger");
+                if (urlQuery.containsKey("tcp_nodelay"))
+                    nativeTcpNodelay = urlQuery.get("tcp_nodelay");
+                if (urlQuery.containsKey("reuse_address"))
+                    nativeReuseAddress = urlQuery.get("reuse_address");
+                if (urlQuery.containsKey("keep_alive"))
+                    nativeKeepAlive = urlQuery.get("keep_alive");
+                if (urlQuery.containsKey("auth_provider"))
+                    nativeAuthProvider = urlQuery.get("auth_provider");
+                if (urlQuery.containsKey("trust_store_path"))
+                    nativeSSLTruststorePath = urlQuery.get("trust_store_path");
+                if (urlQuery.containsKey("key_store_path"))
+                    nativeSSLKeystorePath = urlQuery.get("key_store_path");
+                if (urlQuery.containsKey("trust_store_password"))
+                    nativeSSLTruststorePassword = urlQuery.get("trust_store_password");
+                if (urlQuery.containsKey("key_store_password"))
+                    nativeSSLKeystorePassword = urlQuery.get("key_store_password");
+                if (urlQuery.containsKey("cipher_suites"))
+                    nativeSSLCipherSuites = urlQuery.get("cipher_suites");
+                if (urlQuery.containsKey("input_cql"))
+                    inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("rpc_port"))
+                    rpcPort = urlQuery.get("rpc_port");
+            }
+            String[] parts = urlParts[0].split("/+");
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
+            column_family = parts[2];
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
+                    "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
+                    "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
+                    "[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" +
+                    "[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]" +
+                    "[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]" +
+                    "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
+                    "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
+                    "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
+                    "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 6f17468..3e1d3d4 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
@@ -58,11 +57,11 @@ public class CqlStorage extends AbstractCassandraStorage
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
     private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
-    private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+    protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
-    private int pageSize = 1000;
+    protected int pageSize = 1000;
     private String columns;
-    private String outputQuery;
+    protected String outputQuery;
     private String whereClause;
     private boolean hasCompactValueAlias = false;
         
@@ -130,7 +129,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
 
     /** set the value to the position of the tuple */
-    private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+    protected void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
     {
         if (validator instanceof CollectionType)
             setCollectionTupleValues(tuple, position, value, validator);
@@ -184,7 +183,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
 
     /** convert a cql column to an object */
-    private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+    protected Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
     {
         // standard
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 43995e6..1bc4193 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -35,3 +35,5 @@ server_encryption_options:
 incremental_backups: true
 concurrent_compactors: 4
 compaction_throughput_mb_per_sec: 0
+start_native_transport: true
+native_transport_port: 9052

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
index 2020b0a..1819c61 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -220,7 +220,24 @@ public class CqlTableDataTypeTest extends PigTestBase
     public void testCqlStorageRegularType()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+        cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+        counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageRegularType()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from cqltable where token(key) > ? and token(key) <= ?
+        cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+
+        //input_cql=select * from countertable where token(key) > ? and token(key) <= ?
+        counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20countertable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+    }
+
+    private void cqlTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("rows");
         //{key: int, 
         //col_ascii: chararray, 
@@ -257,21 +274,45 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(t.get(14), "varchar");
             Assert.assertEquals(t.get(15), 123);
         }
-        
-        pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
-        it = pig.openIterator("cc_rows");
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
+    }
+
+    private void counterTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
+        Iterator<Tuple>  it = pig.openIterator("cc_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), 1);
             Assert.assertEquals(t.get(1), 3L);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageSetType()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+        settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageSetType()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from settable where token(key) > ? and token(key) <= ?
+        settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20settable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+    }
+
+    private void settableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("set_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -322,13 +363,30 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(0), 123);
             Assert.assertEquals(innerTuple.get(1), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageListType()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+        listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageListType()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from listtable where token(key) > ? and token(key) <= ?
+        listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20listtable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+    }
+
+    private void listtableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("list_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -379,13 +437,30 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(1), 123);
             Assert.assertEquals(innerTuple.get(0), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageMapType()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+        maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageMapType()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from maptable where token(key) > ? and token(key) <= ?
+        maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20maptable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+    }
+
+    private void maptableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("map_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -436,5 +511,10 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(0), 123);
             Assert.assertEquals(innerTuple.get(1), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 15d49f2..f5adef8 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -93,7 +93,41 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageSchema()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+        cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+        compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageSchema()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from cqltable where token(key1) > ? and token(key1) <= ?
+        cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters +  "&input_cql=select%20*%20from%20cqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+
+        //input_cql=select * from compactcqltable where token(key1) > ? and token(key1) <= ?
+        compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compactcqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+    }
+
+    private void compactCqlTableSchemaTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
+        Iterator<Tuple>  it = pig.openIterator("rows");
+        if (it.hasNext()) {
+            Tuple t = it.next();
+            Assert.assertEquals(t.get(0).toString(), "key1");
+            Assert.assertEquals(t.get(1), 100);
+            Assert.assertEquals(t.get(2), 10.1f);
+            Assert.assertEquals(3, t.size());
+        }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
+    }
+
+    private void cqlTableSchemaTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -103,15 +137,9 @@ public class CqlTableTest extends PigTestBase
             Assert.assertEquals(t.get(3), 10.1f);
             Assert.assertEquals(4, t.size());
         }
-
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();");
-        it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0).toString(), "key1");
-            Assert.assertEquals(t.get(1), 100);
-            Assert.assertEquals(t.get(2), 10.1f);
-            Assert.assertEquals(3, t.size());
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
         }
     }
 
@@ -119,8 +147,23 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageSingleKeyTable()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
+        SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();");
+
+    }
+
+    @Test
+    public void testCqlNativeStorageSingleKeyTable()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+        SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+    }
+
+    private void SingleKeyTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);");
         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage();");
         pig.executeBatch();
@@ -132,7 +175,7 @@ public class CqlTableTest extends PigTestBase
         //(1,1)
         pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + "' USING CqlStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
-        if (it.hasNext()) {
+        while (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), t.get(1));
         }
@@ -142,8 +185,22 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageCompositeKeyTable()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
+        CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageCompositeKeyTable()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from compmore where token(id) > ? and token(id) <= ?
+        CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compmore%20where%20token(id)%20%3E%20%3F%20and%20token(id)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+    }
+
+    private void CompositeKeyTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);");
         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" + defaultParameters + "&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage();");
         pig.executeBatch();
@@ -171,8 +228,22 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageCollectionColumnTable()
     throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
     {
+        CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageCollectionColumnTable()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from collectiontable where token(m) < ? and token(m) <= ?
+        CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=input_cql%3Dselect%20*%20from%20collectiontable%20where%20token(m)%20%3C%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+    }
+
+    private void CollectionColumnTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));");
         pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();");
         pig.executeBatch();
@@ -183,7 +254,7 @@ public class CqlTableTest extends PigTestBase
         //(book1,((m,mm),(n,nn)))
         pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
-        while (it.hasNext()) {
+        if (it.hasNext()) {
             Tuple t = it.next();
             Tuple t1 = (Tuple) t.get(1);
             Assert.assertEquals(t1.size(), 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index 83dc63b..002fbba 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -66,6 +66,9 @@ public class PigTestBase extends SchemaLoader
     protected static MiniCluster cluster; 
     protected static PigServer pig;
     protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+    protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"  +
+                                               "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
+                                               "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9052";
 
     static
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62e70d23/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index 60344d2..b4a8662 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -203,7 +203,35 @@ public class ThriftColumnFamilyTest extends PigTestBase
     public void testCqlStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
     {
         //regular thrift column families
-        pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
+        cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
+
+        //Test counter colun family
+        // This test fails for CASSANDRA-7059
+        //cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
+
+        //Test composite column family
+        cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+    {
+        //regular thrift column families
+        //input_cql=select * from "SomeApp" where token(key) > ? and token(key) <= ?
+        cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22SomeApp%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+
+        //Test counter colun family
+        //input_cql=select * from "CC" where token(key) > ? and token(key) <= ?
+        cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22CC%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+
+        //Test composite column family
+        //input_cql=select * from "Compo" where token(key) > ? and token(key) <= ?
+        cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22Compo%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+    }
+
+    private void cqlStorageTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
         //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
@@ -256,16 +284,18 @@ public class ThriftColumnFamilyTest extends PigTestBase
             }
         }
         Assert.assertEquals(count, 4);
+    }
 
-        //Test counter colun family
-        pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
+    private void cqlStorageCounterTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(chuck,fist,1)
         //(chuck,kick,3)
 
         // {key: chararray,column1: chararray,value: long}
-        it = pig.openIterator("cc_data");
-        count = 0;
+        Iterator<Tuple> it = pig.openIterator("cc_data");
+        int count = 0;
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
@@ -275,9 +305,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
                 Assert.assertEquals(t.get(2), 3L);
         }
         Assert.assertEquals(count, 2);
+    }
 
-        //Test composite column family
-        pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+    private void cqlStorageCompositeTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(kick,bruce,bruce,watch it, mate)
         //(kick,bruce,lee,oww)
@@ -285,8 +317,8 @@ public class ThriftColumnFamilyTest extends PigTestBase
         //(punch,bruce,lee,ouch)
 
         //{key: chararray,column1: chararray,column2: chararray,value: chararray}
-        it = pig.openIterator("compo_data");
-        count = 0;
+        Iterator<Tuple> it = pig.openIterator("compo_data");
+        int count = 0;
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
@@ -583,7 +615,8 @@ public class ThriftColumnFamilyTest extends PigTestBase
         }
     }
 
-    @Test
+    /** This test case fails due to antlr lib conflicts, Cassandra2.1 uses 3.2, Hive1.2 uses 3.4 */
+    //@Test
     public void testCassandraStorageCompositeColumnCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
     {
         //Test CompositeType