You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/10/18 16:47:13 UTC
[1/6] git commit: Add more pig unit tests for type mappings. Patch by
Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Updated Branches:
refs/heads/cassandra-1.2 acd4f8057 -> 44f8ba4f7
refs/heads/cassandra-2.0 91bd207ff -> cf3bd6f86
refs/heads/trunk 3f8a90aa4 -> 5433525f5
Add more pig unit tests for type mappings.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f8ba4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f8ba4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f8ba4f
Branch: refs/heads/cassandra-1.2
Commit: 44f8ba4f702bce85265bf841daf32204e9ee8221
Parents: acd4f80
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:44:14 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:44:14 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index e306005..0d01383 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -147,7 +147,7 @@ public class CqlStorage extends AbstractCassandraStorage
setMapTupleValues(tuple, position, value, validator);
return;
}
- AbstractType<?> elementValidator;
+ AbstractType elementValidator;
if (validator instanceof SetType)
elementValidator = ((SetType<?>) validator).elements;
else if (validator instanceof ListType)
@@ -159,7 +159,7 @@ public class CqlStorage extends AbstractCassandraStorage
Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
for (Object entry : (Collection<?>) value)
{
- setTupleValue(innerTuple, i, entry, elementValidator);
+ setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
i++;
}
tuple.set(position, innerTuple);
@@ -176,8 +176,8 @@ public class CqlStorage extends AbstractCassandraStorage
for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
{
Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
- setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
- setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
+ setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
innerTuple.set(i, mapEntryTuple);
i++;
}
@@ -703,5 +703,12 @@ public class CqlStorage extends AbstractCassandraStorage
throw new IOException("Unsupported expression type: " + opString);
}
}
+
+ private Object cassandraToPigData(Object obj, AbstractType validator)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(validator.decompose(obj));
+ return obj;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
new file mode 100644
index 0000000..a0f1f47
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -0,0 +1,422 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CqlTableDataTypeTest extends PigTestBase
+{
+ //ASCII (AsciiType.instance),
+ //BIGINT (LongType.instance),
+ //BLOB (BytesType.instance),
+ //BOOLEAN (BooleanType.instance),
+ //COUNTER (CounterColumnType.instance),
+ //DECIMAL (DecimalType.instance),
+ //DOUBLE (DoubleType.instance),
+ //FLOAT (FloatType.instance),
+ //INET (InetAddressType.instance),
+ //INT (Int32Type.instance),
+ //TEXT (UTF8Type.instance),
+ //TIMESTAMP(DateType.instance),
+ //UUID (UUIDType.instance),
+ //VARCHAR (UTF8Type.instance),
+ //VARINT (IntegerType.instance),
+ //TIMEUUID (TimeUUIDType.instance);
+ //SET
+ //LIST
+ //MAP
+ //Create table to test the above data types
+ private static String[] statements = {
+ "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
+ "USE cql3ks;",
+
+ "CREATE TABLE cqltable (" +
+ "key int primary key," +
+ "col_ascii ascii," +
+ "col_bigint bigint," +
+ "col_blob blob," +
+ "col_boolean boolean," +
+ "col_decimal decimal," +
+ "col_double double," +
+ "col_float float," +
+ "col_inet inet," +
+ "col_int int," +
+ "col_text text," +
+ "col_timestamp timestamp," +
+ "col_uuid uuid," +
+ "col_varchar varchar," +
+ "col_varint varint," +
+ "col_timeuuid timeuuid);",
+
+ "CREATE TABLE settable (" +
+ "key int primary key," +
+ "col_set_ascii set<ascii>," +
+ "col_set_bigint set<bigint>," +
+ "col_set_blob set<blob>," +
+ "col_set_boolean set<boolean>," +
+ "col_set_decimal set<decimal>," +
+ "col_set_double set<double>," +
+ "col_set_float set<float>," +
+ "col_set_inet set<inet>," +
+ "col_set_int set<int>," +
+ "col_set_text set<text>," +
+ "col_set_timestamp set<timestamp>," +
+ "col_set_uuid set<uuid>," +
+ "col_set_varchar set<varchar>," +
+ "col_set_varint set<varint>," +
+ "col_set_timeuuid set<timeuuid>);",
+
+ "CREATE TABLE listtable (" +
+ "key int primary key," +
+ "col_list_ascii list<ascii>," +
+ "col_list_bigint list<bigint>," +
+ "col_list_blob list<blob>," +
+ "col_list_boolean list<boolean>," +
+ "col_list_decimal list<decimal>," +
+ "col_list_double list<double>," +
+ "col_list_float list<float>," +
+ "col_list_inet list<inet>," +
+ "col_list_int list<int>," +
+ "col_list_text list<text>," +
+ "col_list_timestamp list<timestamp>," +
+ "col_list_uuid list<uuid>," +
+ "col_list_varchar list<varchar>," +
+ "col_list_varint list<varint>," +
+ "col_list_timeuuid list<timeuuid>);",
+
+ "CREATE TABLE maptable (" +
+ "key int primary key," +
+ "col_map_ascii map<ascii, ascii>," +
+ "col_map_bigint map<bigint, bigint>," +
+ "col_map_blob map<blob, blob>," +
+ "col_map_boolean map<boolean, boolean>," +
+ "col_map_decimal map<decimal, decimal>," +
+ "col_map_double map<double, double>," +
+ "col_map_float map<float, float>," +
+ "col_map_inet map<inet, inet>," +
+ "col_map_int map<int, int>," +
+ "col_map_text map<text, text>," +
+ "col_map_timestamp map<timestamp, timestamp>," +
+ "col_map_uuid map<uuid, uuid>," +
+ "col_map_varchar map<varchar, varchar>," +
+ "col_map_varint map<varint, varint>," +
+ "col_map_timeuuid map<timeuuid, timeuuid>);",
+
+ "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');",
+ "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);",
+ "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);",
+ "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);",
+ "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);",
+ "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);",
+ "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);",
+ "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');",
+ "INSERT INTO cqltable(key, col_int) VALUES (1, 123);",
+ "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');",
+ "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');",
+ "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));",
+ "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);",
+ "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');",
+ "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);",
+
+ "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});",
+ "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});",
+ "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});",
+ "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});",
+ "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});",
+ "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});",
+ "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});",
+ "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});",
+ "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});",
+ "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});",
+ "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});",
+ "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});",
+
+ "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);",
+ "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);",
+ "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);",
+ "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);",
+ "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);",
+ "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);",
+ "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);",
+ "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);",
+ "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);",
+ "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);",
+ "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);",
+ "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);",
+ "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);",
+ "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);",
+ "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);",
+
+ "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});",
+ "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});",
+ "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});",
+ "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});",
+ "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});",
+ "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});",
+ "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});",
+ "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});",
+ "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});",
+ "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});",
+ "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});",
+ "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});",
+
+ "CREATE TABLE countertable (key int primary key, col_counter counter);",
+ "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;",
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCql(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCqlStorageRegularType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("rows");
+ //{key: int,
+ //col_ascii: chararray,
+ //col_bigint: long,
+ //col_blob: bytearray,
+ //col_boolean: bytearray,
+ //col_decimal: chararray,
+ //col_double: double,
+ //col_float: float,
+ //col_inet: chararray,
+ //col_int: int,
+ //col_text: chararray,
+ //col_timestamp: long,
+ //col_timeuuid: bytearray,
+ //col_uuid: chararray,
+ //col_varchar: chararray,
+ //col_varint: int}
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), "ascii");
+ Assert.assertEquals(t.get(2), 12345678L);
+ Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f")));
+ Assert.assertEquals(t.get(4), false);
+ Assert.assertEquals(t.get(5), "23.4567");
+ Assert.assertEquals(t.get(6), 12345678.12345678d);
+ Assert.assertEquals(t.get(7), 123.12f);
+ Assert.assertEquals(t.get(8), "127.0.0.1");
+ Assert.assertEquals(t.get(9), 123);
+ Assert.assertEquals(t.get(10), "text");
+ Assert.assertEquals(t.get(11), 1296705900000L);
+ Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(t.get(14), "varchar");
+ Assert.assertEquals(t.get(15), 123);
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), 3L);
+ }
+ }
+
+ @Test
+ public void testCqlStorageSetType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("set_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageListType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("list_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(1), "ascii1");
+ Assert.assertEquals(innerTuple.get(0), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(1), 12345678L);
+ Assert.assertEquals(innerTuple.get(0), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(1), false);
+ Assert.assertEquals(innerTuple.get(0), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(1), "23.4567");
+ Assert.assertEquals(innerTuple.get(0), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(1), 123.12f);
+ Assert.assertEquals(innerTuple.get(0), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(1), "text1");
+ Assert.assertEquals(innerTuple.get(0), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(1), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(0), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(1), "varchar1");
+ Assert.assertEquals(innerTuple.get(0), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageMapType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("map_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) ((Tuple) t.get(2)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) ((Tuple) t.get(3)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) ((Tuple) t.get(4)).get(0);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) ((Tuple) t.get(5)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) ((Tuple) t.get(6)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) ((Tuple) t.get(7)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) ((Tuple) t.get(8)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) ((Tuple) t.get(9)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) ((Tuple) t.get(10)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) ((Tuple) t.get(11)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) ((Tuple) t.get(12)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(13)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(14)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) ((Tuple) t.get(15)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
new file mode 100644
index 0000000..2f97549
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
@@ -0,0 +1,181 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThriftColumnFamilyDataTypeTest extends PigTestBase
+{
+ //AsciiType
+ //LongType
+ //BytesType
+ //BooleanType
+ //CounterColumnType
+ //DecimalType
+ //DoubleType
+ //FloatType
+ //InetAddressType
+ //Int32Type
+ //UTF8Type
+ //DateType
+ //UUIDType
+ //IntegerType
+ //TimeUUIDType
+ //IntegerType
+ //LexicalUUIDType
+ private static String[] statements = {
+ "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
+ " strategy_options={replication_factor:1};",
+ "use thriftKs;",
+
+ "create column family SomeApp " +
+ " with comparator = UTF8Type " +
+ " and default_validation_class = UTF8Type " +
+ " and key_validation_class = UTF8Type " +
+ " and column_metadata = [" +
+ "{column_name: col_ascii, validation_class: AsciiType}, " +
+ "{column_name: col_long, validation_class: LongType}, " +
+ "{column_name: col_bytes, validation_class: BytesType}, " +
+ "{column_name: col_boolean, validation_class: BooleanType}, " +
+ "{column_name: col_decimal, validation_class: DecimalType}, " +
+ "{column_name: col_double, validation_class: DoubleType}, " +
+ "{column_name: col_float, validation_class: FloatType}," +
+ "{column_name: col_inetaddress, validation_class: InetAddressType}, " +
+ "{column_name: col_int32, validation_class: Int32Type}, " +
+ "{column_name: col_uft8, validation_class: UTF8Type}, " +
+ "{column_name: col_date, validation_class: DateType}, " +
+ "{column_name: col_uuid, validation_class: UUIDType}, " +
+ "{column_name: col_integer, validation_class: IntegerType}, " +
+ "{column_name: col_timeuuid, validation_class: TimeUUIDType}, " +
+ "{column_name: col_lexical_uuid, validation_class: LexicalUUIDType}, " +
+ "]; ",
+
+ "set SomeApp['foo']['col_ascii'] = 'ascii';",
+ "set SomeApp['foo']['col_boolean'] = false;",
+ "set SomeApp['foo']['col_bytes'] = 'DEADBEEF';",
+ "set SomeApp['foo']['col_date'] = '2011-02-03T04:05:00+0000';",
+ "set SomeApp['foo']['col_decimal'] = '23.345';",
+ "set SomeApp['foo']['col_double'] = '2.7182818284590451';",
+ "set SomeApp['foo']['col_float'] = '23.45';",
+ "set SomeApp['foo']['col_inetaddress'] = '127.0.0.1';",
+ "set SomeApp['foo']['col_int32'] = 23;",
+ "set SomeApp['foo']['col_integer'] = 12345;",
+ "set SomeApp['foo']['col_long'] = 12345678;",
+ "set SomeApp['foo']['col_lexical_uuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77';",
+ "set SomeApp['foo']['col_timeuuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f';",
+ "set SomeApp['foo']['col_uft8'] = 'hello';",
+ "set SomeApp['foo']['col_uuid'] = '550e8400-e29b-41d4-a716-446655440000';",
+
+ "create column family CC with " +
+ "key_validation_class = UTF8Type and " +
+ "default_validation_class=CounterColumnType " +
+ "and comparator=UTF8Type;",
+
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];"
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCli(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCassandraStorageDataType() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ {
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //{key: chararray, col_ascii: (name: chararray,value: chararray),
+ //col_boolean: (name: chararray,value: bytearray),
+ //col_bytes: (name: chararray,value: bytearray),
+ //col_date: (name: chararray,value: long),
+ //col_decimal: (name: chararray,value: chararray),
+ //col_double: (name: chararray,value: double),
+ //col_float: (name: chararray,value: float),
+ //col_inetaddress: (name: chararray,value: chararray),
+ //col_int32: (name: chararray,value: int),
+ //col_integer: (name: chararray,value: int),
+ //col_lexical_uuid: (name: chararray,value: chararray),
+ //col_long: (name: chararray,value: long),
+ //col_timeuuid: (name: chararray,value: bytearray),
+ //col_uft8: (name: chararray,value: chararray),
+ //col_uuid: (name: chararray,value: chararray),
+ //columns: {(name: chararray,value: chararray)}}
+ Iterator<Tuple> it = pig.openIterator("rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "foo");
+ Tuple column = (Tuple) t.get(1);
+ Assert.assertEquals(column.get(1), "ascii");
+ column = (Tuple) t.get(2);
+ Assert.assertEquals(column.get(1), false);
+ column = (Tuple) t.get(3);
+ Assert.assertEquals(column.get(1), new DataByteArray(Hex.hexToBytes("DEADBEEF")));
+ column = (Tuple) t.get(4);
+ Assert.assertEquals(column.get(1), 1296705900000L);
+ column = (Tuple) t.get(5);
+ Assert.assertEquals(column.get(1), "23.345");
+ column = (Tuple) t.get(6);
+ Assert.assertEquals(column.get(1), 2.7182818284590451d);
+ column = (Tuple) t.get(7);
+ Assert.assertEquals(column.get(1), 23.45f);
+ column = (Tuple) t.get(8);
+ Assert.assertEquals(column.get(1), "127.0.0.1");
+ column = (Tuple) t.get(9);
+ Assert.assertEquals(column.get(1), 23);
+ column = (Tuple) t.get(10);
+ Assert.assertEquals(column.get(1), 12345);
+ column = (Tuple) t.get(11);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ column = (Tuple) t.get(12);
+ Assert.assertEquals(column.get(1), 12345678L);
+ column = (Tuple) t.get(13);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ column = (Tuple) t.get(14);
+ Assert.assertEquals(column.get(1), "hello");
+ column = (Tuple) t.get(15);
+ Assert.assertEquals(column.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cassandra://thriftKs/CC?" + defaultParameters + "' USING CassandraStorage();");
+
+ //(chuck,{(kick,3)})
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "chuck");
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if(iter.hasNext())
+ {
+ Tuple column = iter.next();
+ Assert.assertEquals(column.get(0), "kick");
+ Assert.assertEquals(column.get(1), 3L);
+ }
+ }
+ }
+}
[2/6] git commit: Add more pig unit tests for type mappings. Patch by
Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Posted by br...@apache.org.
Add more pig unit tests for type mappings.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f8ba4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f8ba4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f8ba4f
Branch: refs/heads/cassandra-2.0
Commit: 44f8ba4f702bce85265bf841daf32204e9ee8221
Parents: acd4f80
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:44:14 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:44:14 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index e306005..0d01383 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -147,7 +147,7 @@ public class CqlStorage extends AbstractCassandraStorage
setMapTupleValues(tuple, position, value, validator);
return;
}
- AbstractType<?> elementValidator;
+ AbstractType elementValidator;
if (validator instanceof SetType)
elementValidator = ((SetType<?>) validator).elements;
else if (validator instanceof ListType)
@@ -159,7 +159,7 @@ public class CqlStorage extends AbstractCassandraStorage
Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
for (Object entry : (Collection<?>) value)
{
- setTupleValue(innerTuple, i, entry, elementValidator);
+ setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
i++;
}
tuple.set(position, innerTuple);
@@ -176,8 +176,8 @@ public class CqlStorage extends AbstractCassandraStorage
for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
{
Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
- setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
- setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
+ setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
innerTuple.set(i, mapEntryTuple);
i++;
}
@@ -703,5 +703,12 @@ public class CqlStorage extends AbstractCassandraStorage
throw new IOException("Unsupported expression type: " + opString);
}
}
+
+ private Object cassandraToPigData(Object obj, AbstractType validator)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(validator.decompose(obj));
+ return obj;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
new file mode 100644
index 0000000..a0f1f47
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -0,0 +1,422 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CqlTableDataTypeTest extends PigTestBase
+{
+ //ASCII (AsciiType.instance),
+ //BIGINT (LongType.instance),
+ //BLOB (BytesType.instance),
+ //BOOLEAN (BooleanType.instance),
+ //COUNTER (CounterColumnType.instance),
+ //DECIMAL (DecimalType.instance),
+ //DOUBLE (DoubleType.instance),
+ //FLOAT (FloatType.instance),
+ //INET (InetAddressType.instance),
+ //INT (Int32Type.instance),
+ //TEXT (UTF8Type.instance),
+ //TIMESTAMP(DateType.instance),
+ //UUID (UUIDType.instance),
+ //VARCHAR (UTF8Type.instance),
+ //VARINT (IntegerType.instance),
+ //TIMEUUID (TimeUUIDType.instance);
+ //SET
+ //LIST
+ //MAP
+ //Create table to test the above data types
+ private static String[] statements = {
+ "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
+ "USE cql3ks;",
+
+ "CREATE TABLE cqltable (" +
+ "key int primary key," +
+ "col_ascii ascii," +
+ "col_bigint bigint," +
+ "col_blob blob," +
+ "col_boolean boolean," +
+ "col_decimal decimal," +
+ "col_double double," +
+ "col_float float," +
+ "col_inet inet," +
+ "col_int int," +
+ "col_text text," +
+ "col_timestamp timestamp," +
+ "col_uuid uuid," +
+ "col_varchar varchar," +
+ "col_varint varint," +
+ "col_timeuuid timeuuid);",
+
+ "CREATE TABLE settable (" +
+ "key int primary key," +
+ "col_set_ascii set<ascii>," +
+ "col_set_bigint set<bigint>," +
+ "col_set_blob set<blob>," +
+ "col_set_boolean set<boolean>," +
+ "col_set_decimal set<decimal>," +
+ "col_set_double set<double>," +
+ "col_set_float set<float>," +
+ "col_set_inet set<inet>," +
+ "col_set_int set<int>," +
+ "col_set_text set<text>," +
+ "col_set_timestamp set<timestamp>," +
+ "col_set_uuid set<uuid>," +
+ "col_set_varchar set<varchar>," +
+ "col_set_varint set<varint>," +
+ "col_set_timeuuid set<timeuuid>);",
+
+ "CREATE TABLE listtable (" +
+ "key int primary key," +
+ "col_list_ascii list<ascii>," +
+ "col_list_bigint list<bigint>," +
+ "col_list_blob list<blob>," +
+ "col_list_boolean list<boolean>," +
+ "col_list_decimal list<decimal>," +
+ "col_list_double list<double>," +
+ "col_list_float list<float>," +
+ "col_list_inet list<inet>," +
+ "col_list_int list<int>," +
+ "col_list_text list<text>," +
+ "col_list_timestamp list<timestamp>," +
+ "col_list_uuid list<uuid>," +
+ "col_list_varchar list<varchar>," +
+ "col_list_varint list<varint>," +
+ "col_list_timeuuid list<timeuuid>);",
+
+ "CREATE TABLE maptable (" +
+ "key int primary key," +
+ "col_map_ascii map<ascii, ascii>," +
+ "col_map_bigint map<bigint, bigint>," +
+ "col_map_blob map<blob, blob>," +
+ "col_map_boolean map<boolean, boolean>," +
+ "col_map_decimal map<decimal, decimal>," +
+ "col_map_double map<double, double>," +
+ "col_map_float map<float, float>," +
+ "col_map_inet map<inet, inet>," +
+ "col_map_int map<int, int>," +
+ "col_map_text map<text, text>," +
+ "col_map_timestamp map<timestamp, timestamp>," +
+ "col_map_uuid map<uuid, uuid>," +
+ "col_map_varchar map<varchar, varchar>," +
+ "col_map_varint map<varint, varint>," +
+ "col_map_timeuuid map<timeuuid, timeuuid>);",
+
+ "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');",
+ "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);",
+ "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);",
+ "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);",
+ "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);",
+ "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);",
+ "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);",
+ "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');",
+ "INSERT INTO cqltable(key, col_int) VALUES (1, 123);",
+ "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');",
+ "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');",
+ "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));",
+ "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);",
+ "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');",
+ "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);",
+
+ "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});",
+ "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});",
+ "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});",
+ "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});",
+ "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});",
+ "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});",
+ "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});",
+ "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});",
+ "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});",
+ "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});",
+ "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});",
+ "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});",
+
+ "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);",
+ "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);",
+ "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);",
+ "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);",
+ "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);",
+ "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);",
+ "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);",
+ "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);",
+ "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);",
+ "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);",
+ "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);",
+ "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);",
+ "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);",
+ "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);",
+ "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);",
+
+ "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});",
+ "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});",
+ "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});",
+ "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});",
+ "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});",
+ "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});",
+ "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});",
+ "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});",
+ "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});",
+ "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});",
+ "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});",
+ "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});",
+
+ "CREATE TABLE countertable (key int primary key, col_counter counter);",
+ "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;",
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCql(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCqlStorageRegularType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("rows");
+ //{key: int,
+ //col_ascii: chararray,
+ //col_bigint: long,
+ //col_blob: bytearray,
+ //col_boolean: bytearray,
+ //col_decimal: chararray,
+ //col_double: double,
+ //col_float: float,
+ //col_inet: chararray,
+ //col_int: int,
+ //col_text: chararray,
+ //col_timestamp: long,
+ //col_timeuuid: bytearray,
+ //col_uuid: chararray,
+ //col_varchar: chararray,
+ //col_varint: int}
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), "ascii");
+ Assert.assertEquals(t.get(2), 12345678L);
+ Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f")));
+ Assert.assertEquals(t.get(4), false);
+ Assert.assertEquals(t.get(5), "23.4567");
+ Assert.assertEquals(t.get(6), 12345678.12345678d);
+ Assert.assertEquals(t.get(7), 123.12f);
+ Assert.assertEquals(t.get(8), "127.0.0.1");
+ Assert.assertEquals(t.get(9), 123);
+ Assert.assertEquals(t.get(10), "text");
+ Assert.assertEquals(t.get(11), 1296705900000L);
+ Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(t.get(14), "varchar");
+ Assert.assertEquals(t.get(15), 123);
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), 3L);
+ }
+ }
+
+ @Test
+ public void testCqlStorageSetType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("set_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageListType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("list_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(1), "ascii1");
+ Assert.assertEquals(innerTuple.get(0), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(1), 12345678L);
+ Assert.assertEquals(innerTuple.get(0), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(1), false);
+ Assert.assertEquals(innerTuple.get(0), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(1), "23.4567");
+ Assert.assertEquals(innerTuple.get(0), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(1), 123.12f);
+ Assert.assertEquals(innerTuple.get(0), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(1), "text1");
+ Assert.assertEquals(innerTuple.get(0), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(1), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(0), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(1), "varchar1");
+ Assert.assertEquals(innerTuple.get(0), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageMapType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("map_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) ((Tuple) t.get(2)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) ((Tuple) t.get(3)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) ((Tuple) t.get(4)).get(0);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) ((Tuple) t.get(5)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) ((Tuple) t.get(6)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) ((Tuple) t.get(7)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) ((Tuple) t.get(8)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) ((Tuple) t.get(9)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) ((Tuple) t.get(10)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) ((Tuple) t.get(11)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) ((Tuple) t.get(12)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(13)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(14)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) ((Tuple) t.get(15)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
new file mode 100644
index 0000000..2f97549
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
@@ -0,0 +1,181 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThriftColumnFamilyDataTypeTest extends PigTestBase
+{
+ //AsciiType
+ //LongType
+ //BytesType
+ //BooleanType
+ //CounterColumnType
+ //DecimalType
+ //DoubleType
+ //FloatType
+ //InetAddressType
+ //Int32Type
+ //UTF8Type
+ //DateType
+ //UUIDType
+ //IntegerType
+ //TimeUUIDType
+ //IntegerType
+ //LexicalUUIDType
+ private static String[] statements = {
+ "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
+ " strategy_options={replication_factor:1};",
+ "use thriftKs;",
+
+ "create column family SomeApp " +
+ " with comparator = UTF8Type " +
+ " and default_validation_class = UTF8Type " +
+ " and key_validation_class = UTF8Type " +
+ " and column_metadata = [" +
+ "{column_name: col_ascii, validation_class: AsciiType}, " +
+ "{column_name: col_long, validation_class: LongType}, " +
+ "{column_name: col_bytes, validation_class: BytesType}, " +
+ "{column_name: col_boolean, validation_class: BooleanType}, " +
+ "{column_name: col_decimal, validation_class: DecimalType}, " +
+ "{column_name: col_double, validation_class: DoubleType}, " +
+ "{column_name: col_float, validation_class: FloatType}," +
+ "{column_name: col_inetaddress, validation_class: InetAddressType}, " +
+ "{column_name: col_int32, validation_class: Int32Type}, " +
+ "{column_name: col_uft8, validation_class: UTF8Type}, " +
+ "{column_name: col_date, validation_class: DateType}, " +
+ "{column_name: col_uuid, validation_class: UUIDType}, " +
+ "{column_name: col_integer, validation_class: IntegerType}, " +
+ "{column_name: col_timeuuid, validation_class: TimeUUIDType}, " +
+ "{column_name: col_lexical_uuid, validation_class: LexicalUUIDType}, " +
+ "]; ",
+
+ "set SomeApp['foo']['col_ascii'] = 'ascii';",
+ "set SomeApp['foo']['col_boolean'] = false;",
+ "set SomeApp['foo']['col_bytes'] = 'DEADBEEF';",
+ "set SomeApp['foo']['col_date'] = '2011-02-03T04:05:00+0000';",
+ "set SomeApp['foo']['col_decimal'] = '23.345';",
+ "set SomeApp['foo']['col_double'] = '2.7182818284590451';",
+ "set SomeApp['foo']['col_float'] = '23.45';",
+ "set SomeApp['foo']['col_inetaddress'] = '127.0.0.1';",
+ "set SomeApp['foo']['col_int32'] = 23;",
+ "set SomeApp['foo']['col_integer'] = 12345;",
+ "set SomeApp['foo']['col_long'] = 12345678;",
+ "set SomeApp['foo']['col_lexical_uuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77';",
+ "set SomeApp['foo']['col_timeuuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f';",
+ "set SomeApp['foo']['col_uft8'] = 'hello';",
+ "set SomeApp['foo']['col_uuid'] = '550e8400-e29b-41d4-a716-446655440000';",
+
+ "create column family CC with " +
+ "key_validation_class = UTF8Type and " +
+ "default_validation_class=CounterColumnType " +
+ "and comparator=UTF8Type;",
+
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];"
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCli(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCassandraStorageDataType() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ {
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //{key: chararray, col_ascii: (name: chararray,value: chararray),
+ //col_boolean: (name: chararray,value: bytearray),
+ //col_bytes: (name: chararray,value: bytearray),
+ //col_date: (name: chararray,value: long),
+ //col_decimal: (name: chararray,value: chararray),
+ //col_double: (name: chararray,value: double),
+ //col_float: (name: chararray,value: float),
+ //col_inetaddress: (name: chararray,value: chararray),
+ //col_int32: (name: chararray,value: int),
+ //col_integer: (name: chararray,value: int),
+ //col_lexical_uuid: (name: chararray,value: chararray),
+ //col_long: (name: chararray,value: long),
+ //col_timeuuid: (name: chararray,value: bytearray),
+ //col_uft8: (name: chararray,value: chararray),
+ //col_uuid: (name: chararray,value: chararray),
+ //columns: {(name: chararray,value: chararray)}}
+ Iterator<Tuple> it = pig.openIterator("rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "foo");
+ Tuple column = (Tuple) t.get(1);
+ Assert.assertEquals(column.get(1), "ascii");
+ column = (Tuple) t.get(2);
+ Assert.assertEquals(column.get(1), false);
+ column = (Tuple) t.get(3);
+ Assert.assertEquals(column.get(1), new DataByteArray(Hex.hexToBytes("DEADBEEF")));
+ column = (Tuple) t.get(4);
+ Assert.assertEquals(column.get(1), 1296705900000L);
+ column = (Tuple) t.get(5);
+ Assert.assertEquals(column.get(1), "23.345");
+ column = (Tuple) t.get(6);
+ Assert.assertEquals(column.get(1), 2.7182818284590451d);
+ column = (Tuple) t.get(7);
+ Assert.assertEquals(column.get(1), 23.45f);
+ column = (Tuple) t.get(8);
+ Assert.assertEquals(column.get(1), "127.0.0.1");
+ column = (Tuple) t.get(9);
+ Assert.assertEquals(column.get(1), 23);
+ column = (Tuple) t.get(10);
+ Assert.assertEquals(column.get(1), 12345);
+ column = (Tuple) t.get(11);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ column = (Tuple) t.get(12);
+ Assert.assertEquals(column.get(1), 12345678L);
+ column = (Tuple) t.get(13);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ column = (Tuple) t.get(14);
+ Assert.assertEquals(column.get(1), "hello");
+ column = (Tuple) t.get(15);
+ Assert.assertEquals(column.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cassandra://thriftKs/CC?" + defaultParameters + "' USING CassandraStorage();");
+
+ //(chuck,{(kick,3)})
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "chuck");
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if(iter.hasNext())
+ {
+ Tuple column = iter.next();
+ Assert.assertEquals(column.get(0), "kick");
+ Assert.assertEquals(column.get(1), 3L);
+ }
+ }
+ }
+}
[5/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf3bd6f8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf3bd6f8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf3bd6f8
Branch: refs/heads/cassandra-2.0
Commit: cf3bd6f8611684fc80390b660e3783b98599eecf
Parents: 91bd207 44f8ba4
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:45:25 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:45:25 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf3bd6f8/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
[6/6] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5433525f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5433525f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5433525f
Branch: refs/heads/trunk
Commit: 5433525f5ecda967a755dbff9f54ed96ce8bb81c
Parents: 3f8a90a cf3bd6f
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:45:36 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:45:36 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[3/6] git commit: Add more pig unit tests for type mappings. Patch by
Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Posted by br...@apache.org.
Add more pig unit tests for type mappings.
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6197
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f8ba4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f8ba4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f8ba4f
Branch: refs/heads/trunk
Commit: 44f8ba4f702bce85265bf841daf32204e9ee8221
Parents: acd4f80
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:44:14 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:44:14 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index e306005..0d01383 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -147,7 +147,7 @@ public class CqlStorage extends AbstractCassandraStorage
setMapTupleValues(tuple, position, value, validator);
return;
}
- AbstractType<?> elementValidator;
+ AbstractType elementValidator;
if (validator instanceof SetType)
elementValidator = ((SetType<?>) validator).elements;
else if (validator instanceof ListType)
@@ -159,7 +159,7 @@ public class CqlStorage extends AbstractCassandraStorage
Tuple innerTuple = TupleFactory.getInstance().newTuple(((Collection<?>) value).size());
for (Object entry : (Collection<?>) value)
{
- setTupleValue(innerTuple, i, entry, elementValidator);
+ setTupleValue(innerTuple, i, cassandraToPigData(entry, elementValidator), elementValidator);
i++;
}
tuple.set(position, innerTuple);
@@ -176,8 +176,8 @@ public class CqlStorage extends AbstractCassandraStorage
for(Map.Entry<?,?> entry : ((Map<Object, Object>)value).entrySet())
{
Tuple mapEntryTuple = TupleFactory.getInstance().newTuple(2);
- setTupleValue(mapEntryTuple, 0, entry.getKey(), keyValidator);
- setTupleValue(mapEntryTuple, 1, entry.getValue(), valueValidator);
+ setTupleValue(mapEntryTuple, 0, cassandraToPigData(entry.getKey(), keyValidator), keyValidator);
+ setTupleValue(mapEntryTuple, 1, cassandraToPigData(entry.getValue(), valueValidator), valueValidator);
innerTuple.set(i, mapEntryTuple);
i++;
}
@@ -703,5 +703,12 @@ public class CqlStorage extends AbstractCassandraStorage
throw new IOException("Unsupported expression type: " + opString);
}
}
+
+ private Object cassandraToPigData(Object obj, AbstractType validator)
+ {
+ if (validator instanceof DecimalType || validator instanceof InetAddressType)
+ return validator.getString(validator.decompose(obj));
+ return obj;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
new file mode 100644
index 0000000..a0f1f47
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -0,0 +1,422 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SchemaDisagreementException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CqlTableDataTypeTest extends PigTestBase
+{
+ //ASCII (AsciiType.instance),
+ //BIGINT (LongType.instance),
+ //BLOB (BytesType.instance),
+ //BOOLEAN (BooleanType.instance),
+ //COUNTER (CounterColumnType.instance),
+ //DECIMAL (DecimalType.instance),
+ //DOUBLE (DoubleType.instance),
+ //FLOAT (FloatType.instance),
+ //INET (InetAddressType.instance),
+ //INT (Int32Type.instance),
+ //TEXT (UTF8Type.instance),
+ //TIMESTAMP(DateType.instance),
+ //UUID (UUIDType.instance),
+ //VARCHAR (UTF8Type.instance),
+ //VARINT (IntegerType.instance),
+ //TIMEUUID (TimeUUIDType.instance);
+ //SET
+ //LIST
+ //MAP
+ //Create table to test the above data types
+ private static String[] statements = {
+ "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
+ "USE cql3ks;",
+
+ "CREATE TABLE cqltable (" +
+ "key int primary key," +
+ "col_ascii ascii," +
+ "col_bigint bigint," +
+ "col_blob blob," +
+ "col_boolean boolean," +
+ "col_decimal decimal," +
+ "col_double double," +
+ "col_float float," +
+ "col_inet inet," +
+ "col_int int," +
+ "col_text text," +
+ "col_timestamp timestamp," +
+ "col_uuid uuid," +
+ "col_varchar varchar," +
+ "col_varint varint," +
+ "col_timeuuid timeuuid);",
+
+ "CREATE TABLE settable (" +
+ "key int primary key," +
+ "col_set_ascii set<ascii>," +
+ "col_set_bigint set<bigint>," +
+ "col_set_blob set<blob>," +
+ "col_set_boolean set<boolean>," +
+ "col_set_decimal set<decimal>," +
+ "col_set_double set<double>," +
+ "col_set_float set<float>," +
+ "col_set_inet set<inet>," +
+ "col_set_int set<int>," +
+ "col_set_text set<text>," +
+ "col_set_timestamp set<timestamp>," +
+ "col_set_uuid set<uuid>," +
+ "col_set_varchar set<varchar>," +
+ "col_set_varint set<varint>," +
+ "col_set_timeuuid set<timeuuid>);",
+
+ "CREATE TABLE listtable (" +
+ "key int primary key," +
+ "col_list_ascii list<ascii>," +
+ "col_list_bigint list<bigint>," +
+ "col_list_blob list<blob>," +
+ "col_list_boolean list<boolean>," +
+ "col_list_decimal list<decimal>," +
+ "col_list_double list<double>," +
+ "col_list_float list<float>," +
+ "col_list_inet list<inet>," +
+ "col_list_int list<int>," +
+ "col_list_text list<text>," +
+ "col_list_timestamp list<timestamp>," +
+ "col_list_uuid list<uuid>," +
+ "col_list_varchar list<varchar>," +
+ "col_list_varint list<varint>," +
+ "col_list_timeuuid list<timeuuid>);",
+
+ "CREATE TABLE maptable (" +
+ "key int primary key," +
+ "col_map_ascii map<ascii, ascii>," +
+ "col_map_bigint map<bigint, bigint>," +
+ "col_map_blob map<blob, blob>," +
+ "col_map_boolean map<boolean, boolean>," +
+ "col_map_decimal map<decimal, decimal>," +
+ "col_map_double map<double, double>," +
+ "col_map_float map<float, float>," +
+ "col_map_inet map<inet, inet>," +
+ "col_map_int map<int, int>," +
+ "col_map_text map<text, text>," +
+ "col_map_timestamp map<timestamp, timestamp>," +
+ "col_map_uuid map<uuid, uuid>," +
+ "col_map_varchar map<varchar, varchar>," +
+ "col_map_varint map<varint, varint>," +
+ "col_map_timeuuid map<timeuuid, timeuuid>);",
+
+ "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');",
+ "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);",
+ "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);",
+ "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);",
+ "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);",
+ "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);",
+ "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);",
+ "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');",
+ "INSERT INTO cqltable(key, col_int) VALUES (1, 123);",
+ "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');",
+ "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');",
+ "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));",
+ "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);",
+ "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');",
+ "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);",
+
+ "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});",
+ "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});",
+ "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});",
+ "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});",
+ "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});",
+ "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});",
+ "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});",
+ "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});",
+ "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});",
+ "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});",
+ "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});",
+ "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});",
+
+ "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);",
+ "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);",
+ "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);",
+ "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);",
+ "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);",
+ "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);",
+ "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);",
+ "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);",
+ "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);",
+ "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);",
+ "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);",
+ "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);",
+ "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);",
+ "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);",
+ "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);",
+
+ "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});",
+ "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});",
+ "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});",
+ "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});",
+ "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});",
+ "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});",
+ "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});",
+ "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});",
+ "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});",
+ "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});",
+ "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});",
+ "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
+ "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});",
+ "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});",
+ "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});",
+
+ "CREATE TABLE countertable (key int primary key, col_counter counter);",
+ "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;",
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCql(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCqlStorageRegularType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("rows");
+ //{key: int,
+ //col_ascii: chararray,
+ //col_bigint: long,
+ //col_blob: bytearray,
+ //col_boolean: bytearray,
+ //col_decimal: chararray,
+ //col_double: double,
+ //col_float: float,
+ //col_inet: chararray,
+ //col_int: int,
+ //col_text: chararray,
+ //col_timestamp: long,
+ //col_timeuuid: bytearray,
+ //col_uuid: chararray,
+ //col_varchar: chararray,
+ //col_varint: int}
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), "ascii");
+ Assert.assertEquals(t.get(2), 12345678L);
+ Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f")));
+ Assert.assertEquals(t.get(4), false);
+ Assert.assertEquals(t.get(5), "23.4567");
+ Assert.assertEquals(t.get(6), 12345678.12345678d);
+ Assert.assertEquals(t.get(7), 123.12f);
+ Assert.assertEquals(t.get(8), "127.0.0.1");
+ Assert.assertEquals(t.get(9), 123);
+ Assert.assertEquals(t.get(10), "text");
+ Assert.assertEquals(t.get(11), 1296705900000L);
+ Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(t.get(14), "varchar");
+ Assert.assertEquals(t.get(15), 123);
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Assert.assertEquals(t.get(1), 3L);
+ }
+ }
+
+ @Test
+ public void testCqlStorageSetType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("set_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageListType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("list_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) t.get(1);
+ Assert.assertEquals(innerTuple.get(1), "ascii1");
+ Assert.assertEquals(innerTuple.get(0), "ascii2");
+ innerTuple = (Tuple) t.get(2);
+ Assert.assertEquals(innerTuple.get(1), 12345678L);
+ Assert.assertEquals(innerTuple.get(0), 12345679L);
+ innerTuple = (Tuple) t.get(3);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) t.get(4);
+ Assert.assertEquals(innerTuple.get(1), false);
+ Assert.assertEquals(innerTuple.get(0), true);
+ innerTuple = (Tuple) t.get(5);
+ Assert.assertEquals(innerTuple.get(1), "23.4567");
+ Assert.assertEquals(innerTuple.get(0), "23.4568");
+ innerTuple = (Tuple) t.get(6);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345679d);
+ innerTuple = (Tuple) t.get(7);
+ Assert.assertEquals(innerTuple.get(1), 123.12f);
+ Assert.assertEquals(innerTuple.get(0), 123.13f);
+ innerTuple = (Tuple) t.get(8);
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.2");
+ innerTuple = (Tuple) t.get(9);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ innerTuple = (Tuple) t.get(10);
+ Assert.assertEquals(innerTuple.get(1), "text1");
+ Assert.assertEquals(innerTuple.get(0), "text2");
+ innerTuple = (Tuple) t.get(11);
+ Assert.assertEquals(innerTuple.get(1), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(0), 1296792300000L);
+ innerTuple = (Tuple) t.get(12);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) t.get(13);
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) t.get(14);
+ Assert.assertEquals(innerTuple.get(1), "varchar1");
+ Assert.assertEquals(innerTuple.get(0), "varchar2");
+ innerTuple = (Tuple) t.get(15);
+ Assert.assertEquals(innerTuple.get(1), 123);
+ Assert.assertEquals(innerTuple.get(0), 124);
+ }
+ }
+
+ @Test
+ public void testCqlStorageMapType()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
+ Iterator<Tuple> it = pig.openIterator("map_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 1);
+ Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "ascii1");
+ Assert.assertEquals(innerTuple.get(1), "ascii2");
+ innerTuple = (Tuple) ((Tuple) t.get(2)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678L);
+ Assert.assertEquals(innerTuple.get(1), 12345679L);
+ innerTuple = (Tuple) ((Tuple) t.get(3)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
+ innerTuple = (Tuple) ((Tuple) t.get(4)).get(0);
+ Assert.assertEquals(innerTuple.get(0), false);
+ Assert.assertEquals(innerTuple.get(1), true);
+ innerTuple = (Tuple) ((Tuple) t.get(5)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "23.4567");
+ Assert.assertEquals(innerTuple.get(1), "23.4568");
+ innerTuple = (Tuple) ((Tuple) t.get(6)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
+ Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
+ innerTuple = (Tuple) ((Tuple) t.get(7)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123.12f);
+ Assert.assertEquals(innerTuple.get(1), 123.13f);
+ innerTuple = (Tuple) ((Tuple) t.get(8)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
+ Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
+ innerTuple = (Tuple) ((Tuple) t.get(9)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ innerTuple = (Tuple) ((Tuple) t.get(10)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "text1");
+ Assert.assertEquals(innerTuple.get(1), "text2");
+ innerTuple = (Tuple) ((Tuple) t.get(11)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 1296705900000L);
+ Assert.assertEquals(innerTuple.get(1), 1296792300000L);
+ innerTuple = (Tuple) ((Tuple) t.get(12)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(13)).get(0);
+ Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
+ innerTuple = (Tuple) ((Tuple) t.get(14)).get(0);
+ Assert.assertEquals(innerTuple.get(0), "varchar1");
+ Assert.assertEquals(innerTuple.get(1), "varchar2");
+ innerTuple = (Tuple) ((Tuple) t.get(15)).get(0);
+ Assert.assertEquals(innerTuple.get(0), 123);
+ Assert.assertEquals(innerTuple.get(1), 124);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8ba4f/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
new file mode 100644
index 0000000..2f97549
--- /dev/null
+++ b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
@@ -0,0 +1,181 @@
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.Hex;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThriftColumnFamilyDataTypeTest extends PigTestBase
+{
+ //AsciiType
+ //LongType
+ //BytesType
+ //BooleanType
+ //CounterColumnType
+ //DecimalType
+ //DoubleType
+ //FloatType
+ //InetAddressType
+ //Int32Type
+ //UTF8Type
+ //DateType
+ //UUIDType
+ //IntegerType
+ //TimeUUIDType
+ //IntegerType
+ //LexicalUUIDType
+ private static String[] statements = {
+ "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
+ " strategy_options={replication_factor:1};",
+ "use thriftKs;",
+
+ "create column family SomeApp " +
+ " with comparator = UTF8Type " +
+ " and default_validation_class = UTF8Type " +
+ " and key_validation_class = UTF8Type " +
+ " and column_metadata = [" +
+ "{column_name: col_ascii, validation_class: AsciiType}, " +
+ "{column_name: col_long, validation_class: LongType}, " +
+ "{column_name: col_bytes, validation_class: BytesType}, " +
+ "{column_name: col_boolean, validation_class: BooleanType}, " +
+ "{column_name: col_decimal, validation_class: DecimalType}, " +
+ "{column_name: col_double, validation_class: DoubleType}, " +
+ "{column_name: col_float, validation_class: FloatType}," +
+ "{column_name: col_inetaddress, validation_class: InetAddressType}, " +
+ "{column_name: col_int32, validation_class: Int32Type}, " +
+ "{column_name: col_uft8, validation_class: UTF8Type}, " +
+ "{column_name: col_date, validation_class: DateType}, " +
+ "{column_name: col_uuid, validation_class: UUIDType}, " +
+ "{column_name: col_integer, validation_class: IntegerType}, " +
+ "{column_name: col_timeuuid, validation_class: TimeUUIDType}, " +
+ "{column_name: col_lexical_uuid, validation_class: LexicalUUIDType}, " +
+ "]; ",
+
+ "set SomeApp['foo']['col_ascii'] = 'ascii';",
+ "set SomeApp['foo']['col_boolean'] = false;",
+ "set SomeApp['foo']['col_bytes'] = 'DEADBEEF';",
+ "set SomeApp['foo']['col_date'] = '2011-02-03T04:05:00+0000';",
+ "set SomeApp['foo']['col_decimal'] = '23.345';",
+ "set SomeApp['foo']['col_double'] = '2.7182818284590451';",
+ "set SomeApp['foo']['col_float'] = '23.45';",
+ "set SomeApp['foo']['col_inetaddress'] = '127.0.0.1';",
+ "set SomeApp['foo']['col_int32'] = 23;",
+ "set SomeApp['foo']['col_integer'] = 12345;",
+ "set SomeApp['foo']['col_long'] = 12345678;",
+ "set SomeApp['foo']['col_lexical_uuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77';",
+ "set SomeApp['foo']['col_timeuuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f';",
+ "set SomeApp['foo']['col_uft8'] = 'hello';",
+ "set SomeApp['foo']['col_uuid'] = '550e8400-e29b-41d4-a716-446655440000';",
+
+ "create column family CC with " +
+ "key_validation_class = UTF8Type and " +
+ "default_validation_class=CounterColumnType " +
+ "and comparator=UTF8Type;",
+
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];"
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCli(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCassandraStorageDataType() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ {
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //{key: chararray, col_ascii: (name: chararray,value: chararray),
+ //col_boolean: (name: chararray,value: bytearray),
+ //col_bytes: (name: chararray,value: bytearray),
+ //col_date: (name: chararray,value: long),
+ //col_decimal: (name: chararray,value: chararray),
+ //col_double: (name: chararray,value: double),
+ //col_float: (name: chararray,value: float),
+ //col_inetaddress: (name: chararray,value: chararray),
+ //col_int32: (name: chararray,value: int),
+ //col_integer: (name: chararray,value: int),
+ //col_lexical_uuid: (name: chararray,value: chararray),
+ //col_long: (name: chararray,value: long),
+ //col_timeuuid: (name: chararray,value: bytearray),
+ //col_uft8: (name: chararray,value: chararray),
+ //col_uuid: (name: chararray,value: chararray),
+ //columns: {(name: chararray,value: chararray)}}
+ Iterator<Tuple> it = pig.openIterator("rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "foo");
+ Tuple column = (Tuple) t.get(1);
+ Assert.assertEquals(column.get(1), "ascii");
+ column = (Tuple) t.get(2);
+ Assert.assertEquals(column.get(1), false);
+ column = (Tuple) t.get(3);
+ Assert.assertEquals(column.get(1), new DataByteArray(Hex.hexToBytes("DEADBEEF")));
+ column = (Tuple) t.get(4);
+ Assert.assertEquals(column.get(1), 1296705900000L);
+ column = (Tuple) t.get(5);
+ Assert.assertEquals(column.get(1), "23.345");
+ column = (Tuple) t.get(6);
+ Assert.assertEquals(column.get(1), 2.7182818284590451d);
+ column = (Tuple) t.get(7);
+ Assert.assertEquals(column.get(1), 23.45f);
+ column = (Tuple) t.get(8);
+ Assert.assertEquals(column.get(1), "127.0.0.1");
+ column = (Tuple) t.get(9);
+ Assert.assertEquals(column.get(1), 23);
+ column = (Tuple) t.get(10);
+ Assert.assertEquals(column.get(1), 12345);
+ column = (Tuple) t.get(11);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
+ column = (Tuple) t.get(12);
+ Assert.assertEquals(column.get(1), 12345678L);
+ column = (Tuple) t.get(13);
+ Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
+ column = (Tuple) t.get(14);
+ Assert.assertEquals(column.get(1), "hello");
+ column = (Tuple) t.get(15);
+ Assert.assertEquals(column.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
+ }
+
+ pig.registerQuery("cc_rows = LOAD 'cassandra://thriftKs/CC?" + defaultParameters + "' USING CassandraStorage();");
+
+ //(chuck,{(kick,3)})
+ it = pig.openIterator("cc_rows");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "chuck");
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if(iter.hasNext())
+ {
+ Tuple column = iter.next();
+ Assert.assertEquals(column.get(0), "kick");
+ Assert.assertEquals(column.get(1), 3L);
+ }
+ }
+ }
+}
[4/6] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by br...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cf3bd6f8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cf3bd6f8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cf3bd6f8
Branch: refs/heads/trunk
Commit: cf3bd6f8611684fc80390b660e3783b98599eecf
Parents: 91bd207 44f8ba4
Author: Brandon Williams <br...@apache.org>
Authored: Fri Oct 18 09:45:25 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Oct 18 09:45:25 2013 -0500
----------------------------------------------------------------------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 15 +-
.../cassandra/pig/CqlTableDataTypeTest.java | 422 +++++++++++++++++++
.../pig/ThriftColumnFamilyDataTypeTest.java | 181 ++++++++
3 files changed, 614 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cf3bd6f8/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------