You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/07/23 17:32:01 UTC
[04/12] git commit: Pig support for hadoop CqlInputFormat
Pig support for hadoop CqlInputFormat
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6454
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e2a5b0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e2a5b0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e2a5b0e
Branch: refs/heads/cassandra-2.1
Commit: 1e2a5b0ee334327f1ed62e181542328a0824c27c
Parents: ef894c2
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jul 23 10:26:50 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jul 23 10:26:50 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
build.xml | 1 +
.../cassandra/hadoop/cql3/CqlConfigHelper.java | 1 -
.../cassandra/hadoop/cql3/CqlInputFormat.java | 2 -
.../cassandra/hadoop/pig/CqlNativeStorage.java | 289 +++++++++++++++++++
.../apache/cassandra/hadoop/pig/CqlStorage.java | 12 +-
test/conf/cassandra.yaml | 2 +
.../cassandra/pig/CqlTableDataTypeTest.java | 94 +++++-
.../org/apache/cassandra/pig/CqlTableTest.java | 101 ++++++-
.../org/apache/cassandra/pig/PigTestBase.java | 3 +
.../cassandra/pig/ThriftColumnFamilyTest.java | 73 +++--
11 files changed, 529 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9909760..6f67720 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.10
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
* Fix ReversedType(DateType) mapping to native protocol (CASSANDRA-7576)
* (Windows) force range-based repair to non-sequential mode (CASSANDRA-7541)
* Fix range merging when DES scores are zero (CASSANDRA-7535)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ddf74e4..68907df 100644
--- a/build.xml
+++ b/build.xml
@@ -1275,6 +1275,7 @@
<classpathentry kind="src" path="interface/thrift/gen-java"/>
<classpathentry kind="src" path="test/unit"/>
<classpathentry kind="src" path="test/long"/>
+ <classpathentry kind="src" path="test/pig"/>
<classpathentry kind="src" path="tools/stress/src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="build/classes/main"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 63279d1..b375ce2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.hadoop.cql3;
*
*/
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index e1cdf32..09bd80c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.hadoop.cql3;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
import org.apache.hadoop.mapred.InputSplit;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
new file mode 100644
index 0000000..948d21c
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -0,0 +1,289 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.datastax.driver.core.Row;
+
+public class CqlNativeStorage extends CqlStorage
+{
+ private RecordReader<Long, Row> reader;
+ private String nativePort;
+ private String nativeCoreConnections;
+ private String nativeMaxConnections;
+ private String nativeMinSimultReqs;
+ private String nativeMaxSimultReqs;
+ private String nativeConnectionTimeout;
+ private String nativeReadConnectionTimeout;
+ private String nativeReceiveBufferSize;
+ private String nativeSendBufferSize;
+ private String nativeSolinger;
+ private String nativeTcpNodelay;
+ private String nativeReuseAddress;
+ private String nativeKeepAlive;
+ private String nativeAuthProvider;
+ private String nativeSSLTruststorePath;
+ private String nativeSSLKeystorePath;
+ private String nativeSSLTruststorePassword;
+ private String nativeSSLKeystorePassword;
+ private String nativeSSLCipherSuites;
+ private String inputCql;
+
+ public CqlNativeStorage()
+ {
+ this(1000);
+ }
+
+ /** @param pageSize limit number of CQL rows to fetch in a thrift request */
+ public CqlNativeStorage(int pageSize)
+ {
+ super(pageSize);
+ DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+ }
+
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ {
+ this.reader = reader;
+ }
+
+ /** get next row */
+ public Tuple getNext() throws IOException
+ {
+ try
+ {
+ // load the next pair
+ if (!reader.nextKeyValue())
+ return null;
+
+ CfInfo cfInfo = getCfInfo(loadSignature);
+ CfDef cfDef = cfInfo.cfDef;
+ Row row = reader.getCurrentValue();
+ Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+ Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+ int i = 0;
+ while (itera.hasNext())
+ {
+ ColumnDef cdef = itera.next();
+ ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+ if (columnValue != null)
+ {
+ Column column = new Column(cdef.name, columnValue);
+ AbstractType<?> validator = getValidatorMap(cfDef).get(column.name());
+ setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator);
+ }
+ else
+ tuple.set(i, null);
+ i++;
+ }
+ return tuple;
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /** set read configuration settings */
+ public void setLocation(String location, Job job) throws IOException
+ {
+ conf = job.getConfiguration();
+ setLocationFromUri(location);
+
+ if (username != null && password != null)
+ {
+ ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+ CqlConfigHelper.setUserNameAndPassword(conf, username, password);
+ }
+ if (splitSize > 0)
+ ConfigHelper.setInputSplitSize(conf, splitSize);
+ if (partitionerClass!= null)
+ ConfigHelper.setInputPartitioner(conf, partitionerClass);
+ if (initHostAddress != null)
+ ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+ if (rpcPort != null)
+ ConfigHelper.setInputRpcPort(conf, rpcPort);
+ if (nativePort != null)
+ CqlConfigHelper.setInputNativePort(conf, nativePort);
+ if (nativeCoreConnections != null)
+ CqlConfigHelper.setInputCoreConnections(conf, nativeCoreConnections);
+ if (nativeMaxConnections != null)
+ CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections);
+ if (nativeMinSimultReqs != null)
+ CqlConfigHelper.setInputMinSimultReqPerConnections(conf, nativeMinSimultReqs);
+ if (nativeMinSimultReqs != null)
+ CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, nativeMaxSimultReqs);
+ if (nativeConnectionTimeout != null)
+ CqlConfigHelper.setInputNativeConnectionTimeout(conf, nativeConnectionTimeout);
+ if (nativeReadConnectionTimeout != null)
+ CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, nativeReadConnectionTimeout);
+ if (nativeReceiveBufferSize != null)
+ CqlConfigHelper.setInputNativeReceiveBufferSize(conf, nativeReceiveBufferSize);
+ if (nativeSendBufferSize != null)
+ CqlConfigHelper.setInputNativeSendBufferSize(conf, nativeSendBufferSize);
+ if (nativeSolinger != null)
+ CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger);
+ if (nativeTcpNodelay != null)
+ CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay);
+ if (nativeReuseAddress != null)
+ CqlConfigHelper.setInputNativeReuseAddress(conf, nativeReuseAddress);
+ if (nativeKeepAlive != null)
+ CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive);
+ if (nativeAuthProvider != null)
+ CqlConfigHelper.setInputNativeAuthProvider(conf, nativeAuthProvider);
+ if (nativeSSLTruststorePath != null)
+ CqlConfigHelper.setInputNativeSSLTruststorePath(conf, nativeSSLTruststorePath);
+ if (nativeSSLKeystorePath != null)
+ CqlConfigHelper.setInputNativeSSLKeystorePath(conf, nativeSSLKeystorePath);
+ if (nativeSSLTruststorePassword != null)
+ CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, nativeSSLTruststorePassword);
+ if (nativeSSLKeystorePassword != null)
+ CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, nativeSSLKeystorePassword);
+ if (nativeSSLCipherSuites != null)
+ CqlConfigHelper.setInputNativeSSLCipherSuites(conf, nativeSSLCipherSuites);
+
+ ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+ setConnectionInformation();
+
+ CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+ CqlConfigHelper.setInputCql(conf, inputCql);
+ if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+ {
+ try
+ {
+ ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
+ }
+ }
+
+ if (ConfigHelper.getInputInitialAddress(conf) == null)
+ throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set");
+ if (ConfigHelper.getInputPartitioner(conf) == null)
+ throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set");
+ if (loadSignature == null)
+ loadSignature = location;
+
+ initSchema(loadSignature);
+ }
+
+ private void setLocationFromUri(String location) throws IOException
+ {
+ try
+ {
+ if (!location.startsWith("cql://"))
+ throw new Exception("Bad scheme: " + location);
+
+ String[] urlParts = location.split("\\?");
+ if (urlParts.length > 1)
+ {
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+ // each page row size
+ if (urlQuery.containsKey("page_size"))
+ pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+ // output prepared statement
+ if (urlQuery.containsKey("output_query"))
+ outputQuery = urlQuery.get("output_query");
+
+ //split size
+ if (urlQuery.containsKey("split_size"))
+ splitSize = Integer.parseInt(urlQuery.get("split_size"));
+ if (urlQuery.containsKey("partitioner"))
+ partitionerClass = urlQuery.get("partitioner");
+ if (urlQuery.containsKey("use_secondary"))
+ usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
+ if (urlQuery.containsKey("init_address"))
+ initHostAddress = urlQuery.get("init_address");
+
+ if (urlQuery.containsKey("native_port"))
+ nativePort = urlQuery.get("native_port");
+ if (urlQuery.containsKey("core_conns"))
+ nativeCoreConnections = urlQuery.get("core_conns");
+ if (urlQuery.containsKey("max_conns"))
+ nativeMaxConnections = urlQuery.get("max_conns");
+ if (urlQuery.containsKey("min_simult_reqs"))
+ nativeMinSimultReqs = urlQuery.get("min_simult_reqs");
+ if (urlQuery.containsKey("max_simult_reqs"))
+ nativeMaxSimultReqs = urlQuery.get("max_simult_reqs");
+ if (urlQuery.containsKey("native_timeout"))
+ nativeConnectionTimeout = urlQuery.get("native_timeout");
+ if (urlQuery.containsKey("native_read_timeout"))
+ nativeReadConnectionTimeout = urlQuery.get("native_read_timeout");
+ if (urlQuery.containsKey("rec_buff_size"))
+ nativeReceiveBufferSize = urlQuery.get("rec_buff_size");
+ if (urlQuery.containsKey("send_buff_size"))
+ nativeSendBufferSize = urlQuery.get("send_buff_size");
+ if (urlQuery.containsKey("solinger"))
+ nativeSolinger = urlQuery.get("solinger");
+ if (urlQuery.containsKey("tcp_nodelay"))
+ nativeTcpNodelay = urlQuery.get("tcp_nodelay");
+ if (urlQuery.containsKey("reuse_address"))
+ nativeReuseAddress = urlQuery.get("reuse_address");
+ if (urlQuery.containsKey("keep_alive"))
+ nativeKeepAlive = urlQuery.get("keep_alive");
+ if (urlQuery.containsKey("auth_provider"))
+ nativeAuthProvider = urlQuery.get("auth_provider");
+ if (urlQuery.containsKey("trust_store_path"))
+ nativeSSLTruststorePath = urlQuery.get("trust_store_path");
+ if (urlQuery.containsKey("key_store_path"))
+ nativeSSLKeystorePath = urlQuery.get("key_store_path");
+ if (urlQuery.containsKey("trust_store_password"))
+ nativeSSLTruststorePassword = urlQuery.get("trust_store_password");
+ if (urlQuery.containsKey("key_store_password"))
+ nativeSSLKeystorePassword = urlQuery.get("key_store_password");
+ if (urlQuery.containsKey("cipher_suites"))
+ nativeSSLCipherSuites = urlQuery.get("cipher_suites");
+ if (urlQuery.containsKey("input_cql"))
+ inputCql = urlQuery.get("input_cql");
+ if (urlQuery.containsKey("rpc_port"))
+ rpcPort = urlQuery.get("rpc_port");
+ }
+ String[] parts = urlParts[0].split("/+");
+ String[] credentialsAndKeyspace = parts[1].split("@");
+ if (credentialsAndKeyspace.length > 1)
+ {
+ String[] credentials = credentialsAndKeyspace[0].split(":");
+ username = credentials[0];
+ password = credentials[1];
+ keyspace = credentialsAndKeyspace[1];
+ }
+ else
+ {
+ keyspace = parts[1];
+ }
+ column_family = parts[2];
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" +
+ "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" +
+ "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" +
+ "[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" +
+ "[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]" +
+ "[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]" +
+ "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" +
+ "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
+ "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
+ "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 284b72a..02a6d98 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
-
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.cql3.CFDefinition;
import org.apache.cassandra.db.Column;
@@ -33,7 +32,6 @@ import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.Expression.OpType;
@@ -58,11 +56,11 @@ public class CqlStorage extends AbstractCassandraStorage
private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class);
private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader;
- private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+ protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
- private int pageSize = 1000;
+ protected int pageSize = 1000;
private String columns;
- private String outputQuery;
+ protected String outputQuery;
private String whereClause;
private boolean hasCompactValueAlias = false;
@@ -130,7 +128,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
/** set the value to the position of the tuple */
- private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
+ protected void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException
{
if (validator instanceof CollectionType)
setCollectionTupleValues(tuple, position, value, validator);
@@ -184,7 +182,7 @@ public class CqlStorage extends AbstractCassandraStorage
}
/** convert a cql column to an object */
- private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+ protected Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
{
// standard
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index a207bc6..d92eba6 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -32,3 +32,5 @@ server_encryption_options:
truststore_password: cassandra
incremental_backups: true
compaction_throughput_mb_per_sec: 0
+start_native_transport: true
+native_transport_port: 9052
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
index 2020b0a..1819c61 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -220,7 +220,24 @@ public class CqlTableDataTypeTest extends PigTestBase
public void testCqlStorageRegularType()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
- pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageRegularType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from cqltable where token(key) > ? and token(key) <= ?
+ cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+
+ //input_cql=select * from countertable where token(key) > ? and token(key) <= ?
+ counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20countertable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+ }
+
+ private void cqlTableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
Iterator<Tuple> it = pig.openIterator("rows");
//{key: int,
//col_ascii: chararray,
@@ -257,21 +274,45 @@ public class CqlTableDataTypeTest extends PigTestBase
Assert.assertEquals(t.get(14), "varchar");
Assert.assertEquals(t.get(15), 123);
}
-
- pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
- it = pig.openIterator("cc_rows");
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
+ }
+
+ private void counterTableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
+ Iterator<Tuple> it = pig.openIterator("cc_rows");
if (it.hasNext()) {
Tuple t = it.next();
Assert.assertEquals(t.get(0), 1);
Assert.assertEquals(t.get(1), 3L);
}
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
}
@Test
public void testCqlStorageSetType()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
- pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+ settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageSetType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from settable where token(key) > ? and token(key) <= ?
+ settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20settable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+ }
+
+ private void settableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
Iterator<Tuple> it = pig.openIterator("set_rows");
if (it.hasNext()) {
Tuple t = it.next();
@@ -322,13 +363,30 @@ public class CqlTableDataTypeTest extends PigTestBase
Assert.assertEquals(innerTuple.get(0), 123);
Assert.assertEquals(innerTuple.get(1), 124);
}
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
}
@Test
public void testCqlStorageListType()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
- pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+ listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageListType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from listtable where token(key) > ? and token(key) <= ?
+ listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20listtable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+ }
+
+ private void listtableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
Iterator<Tuple> it = pig.openIterator("list_rows");
if (it.hasNext()) {
Tuple t = it.next();
@@ -379,13 +437,30 @@ public class CqlTableDataTypeTest extends PigTestBase
Assert.assertEquals(innerTuple.get(1), 123);
Assert.assertEquals(innerTuple.get(0), 124);
}
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
}
@Test
public void testCqlStorageMapType()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
- pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+ maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageMapType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from maptable where token(key) > ? and token(key) <= ?
+ maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20maptable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();");
+ }
+
+ private void maptableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
Iterator<Tuple> it = pig.openIterator("map_rows");
if (it.hasNext()) {
Tuple t = it.next();
@@ -436,5 +511,10 @@ public class CqlTableDataTypeTest extends PigTestBase
Assert.assertEquals(innerTuple.get(0), 123);
Assert.assertEquals(innerTuple.get(1), 124);
}
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 15d49f2..f5adef8 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -93,7 +93,41 @@ public class CqlTableTest extends PigTestBase
public void testCqlStorageSchema()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
- pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageSchema()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from cqltable where token(key1) > ? and token(key1) <= ?
+ cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+
+ //input_cql=select * from compactcqltable where token(key1) > ? and token(key1) <= ?
+ compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compactcqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ }
+
+ private void compactCqlTableSchemaTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
+ Iterator<Tuple> it = pig.openIterator("rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0).toString(), "key1");
+ Assert.assertEquals(t.get(1), 100);
+ Assert.assertEquals(t.get(2), 10.1f);
+ Assert.assertEquals(3, t.size());
+ }
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
+ }
+ }
+
+ private void cqlTableSchemaTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
Iterator<Tuple> it = pig.openIterator("rows");
if (it.hasNext()) {
Tuple t = it.next();
@@ -103,15 +137,9 @@ public class CqlTableTest extends PigTestBase
Assert.assertEquals(t.get(3), 10.1f);
Assert.assertEquals(4, t.size());
}
-
- pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();");
- it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0).toString(), "key1");
- Assert.assertEquals(t.get(1), 100);
- Assert.assertEquals(t.get(2), 10.1f);
- Assert.assertEquals(3, t.size());
+ else
+ {
+ Assert.fail("Failed to get data for query " + initialQuery);
}
}
@@ -119,8 +147,23 @@ public class CqlTableTest extends PigTestBase
public void testCqlStorageSingleKeyTable()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
+ SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();");
+
+ }
+
+ @Test
+ public void testCqlNativeStorageSingleKeyTable()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+ SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ }
+
+ private void SingleKeyTableTest(String initialQuery)
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
pig.setBatchOn();
- pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();");
+ pig.registerQuery(initialQuery);
pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);");
pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage();");
pig.executeBatch();
@@ -132,7 +175,7 @@ public class CqlTableTest extends PigTestBase
//(1,1)
pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + "' USING CqlStorage();");
Iterator<Tuple> it = pig.openIterator("result");
- if (it.hasNext()) {
+ while (it.hasNext()) {
Tuple t = it.next();
Assert.assertEquals(t.get(0), t.get(1));
}
@@ -142,8 +185,22 @@ public class CqlTableTest extends PigTestBase
public void testCqlStorageCompositeKeyTable()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
+ CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageCompositeKeyTable()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from compmore where token(id) > ? and token(id) <= ?
+ CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compmore%20where%20token(id)%20%3E%20%3F%20and%20token(id)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ }
+
+ private void CompositeKeyTableTest(String initialQuery)
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
pig.setBatchOn();
- pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();");
+ pig.registerQuery(initialQuery);
pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);");
pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" + defaultParameters + "&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage();");
pig.executeBatch();
@@ -171,8 +228,22 @@ public class CqlTableTest extends PigTestBase
public void testCqlStorageCollectionColumnTable()
throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
{
+ CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorageCollectionColumnTable()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ //input_cql=select * from collectiontable where token(m) < ? and token(m) <= ?
+ CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=input_cql%3Dselect%20*%20from%20collectiontable%20where%20token(m)%20%3C%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ }
+
+ private void CollectionColumnTableTest(String initialQuery)
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
pig.setBatchOn();
- pig.registerQuery("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+ pig.registerQuery(initialQuery);
pig.registerQuery("recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));");
pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();");
pig.executeBatch();
@@ -183,7 +254,7 @@ public class CqlTableTest extends PigTestBase
//(book1,((m,mm),(n,nn)))
pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
Iterator<Tuple> it = pig.openIterator("result");
- while (it.hasNext()) {
+ if (it.hasNext()) {
Tuple t = it.next();
Tuple t1 = (Tuple) t.get(1);
Assert.assertEquals(t1.size(), 2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index ea06b8c..ed307f4 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -66,6 +66,9 @@ public class PigTestBase extends SchemaLoader
protected static MiniCluster cluster;
protected static PigServer pig;
protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+ protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" +
+ "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
+ "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9052";
@AfterClass
public static void oneTimeTearDown() throws Exception {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index e114d37..3f1d5a1 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -183,6 +183,13 @@ public class ThriftColumnFamilyTest extends PigTestBase
"and comparator = LongType;"
};
+ private static String[] deleteCopyOfSomeAppTableData = { "use thriftKs;",
+ "DEL CopyOfSomeApp ['foo']",
+ "DEL CopyOfSomeApp ['bar']",
+ "DEL CopyOfSomeApp ['baz']",
+ "DEL CopyOfSomeApp ['qux']"
+ };
+
@BeforeClass
public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
@@ -196,7 +203,35 @@ public class ThriftColumnFamilyTest extends PigTestBase
public void testCqlStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
{
//regular thrift column families
- pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
+ cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
+
+ //Test counter colun family
+ // This test fails for CASSANDRA-7059
+ //cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
+
+ //Test composite column family
+ cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+ }
+
+ @Test
+ public void testCqlNativeStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //regular thrift column families
+ //input_cql=select * from "SomeApp" where token(key) > ? and token(key) <= ?
+ cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22SomeApp%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+
+ //Test counter colun family
+ //input_cql=select * from "CC" where token(key) > ? and token(key) <= ?
+ cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22CC%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+
+ //Test composite column family
+ //input_cql=select * from "Compo" where token(key) > ? and token(key) <= ?
+ cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22Compo%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();");
+ }
+
+ private void cqlStorageTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
//(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
//(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
@@ -249,16 +284,18 @@ public class ThriftColumnFamilyTest extends PigTestBase
}
}
Assert.assertEquals(count, 4);
+ }
- //Test counter colun family
- pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
+ private void cqlStorageCounterTableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
//(chuck,fist,1)
//(chuck,kick,3)
// {key: chararray,column1: chararray,value: long}
- it = pig.openIterator("cc_data");
- count = 0;
+ Iterator<Tuple> it = pig.openIterator("cc_data");
+ int count = 0;
while (it.hasNext()) {
count ++;
Tuple t = it.next();
@@ -268,9 +305,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
Assert.assertEquals(t.get(2), 3L);
}
Assert.assertEquals(count, 2);
+ }
- //Test composite column family
- pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+ private void cqlStorageCompositeTableTest(String initialQuery) throws IOException
+ {
+ pig.registerQuery(initialQuery);
//(kick,bruce,bruce,watch it, mate)
//(kick,bruce,lee,oww)
@@ -278,8 +317,8 @@ public class ThriftColumnFamilyTest extends PigTestBase
//(punch,bruce,lee,ouch)
//{key: chararray,column1: chararray,column2: chararray,value: chararray}
- it = pig.openIterator("compo_data");
- count = 0;
+ Iterator<Tuple> it = pig.openIterator("compo_data");
+ int count = 0;
while (it.hasNext()) {
count ++;
Tuple t = it.next();
@@ -351,7 +390,6 @@ public class ThriftColumnFamilyTest extends PigTestBase
@Test
public void testCassandraStorageFullCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
{
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
pig.setBatchOn();
pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
//full copy
@@ -365,7 +403,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
@Test
public void testCassandraStorageSigleTupleCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
{
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ executeCliStatements(deleteCopyOfSomeAppTableData);
pig.setBatchOn();
pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
//sigle tuple
@@ -399,7 +437,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
@Test
public void testCassandraStorageBagOnlyCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
{
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ executeCliStatements(deleteCopyOfSomeAppTableData);
pig.setBatchOn();
pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
//bag only
@@ -443,7 +481,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
@Test
public void testCassandraStorageFilter() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
{
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ executeCliStatements(deleteCopyOfSomeAppTableData);
pig.setBatchOn();
pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
@@ -476,7 +514,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
if (value != null)
Assert.fail();
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ executeCliStatements(deleteCopyOfSomeAppTableData);
pig.setBatchOn();
pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;");
@@ -746,17 +784,16 @@ public class ThriftColumnFamilyTest extends PigTestBase
return parseType(validator).getString(got.getColumn().value);
}
- private void createColumnFamily(String ks, String cf, String statement) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ private void executeCliStatements(String[] statements) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
{
CliMain.connect("127.0.0.1", 9170);
try
{
- CliMain.processStatement("use " + ks + ";");
- CliMain.processStatement("drop column family " + cf + ";");
+ for (String stmt : statements)
+ CliMain.processStatement(stmt);
}
catch (Exception e)
{
}
- CliMain.processStatement(statement);
}
}