You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/27 17:44:32 UTC
[2/7] move pig-test out of normal unit tests (still part of test-all)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
new file mode 100644
index 0000000..223cbf4
--- /dev/null
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.Iterator;
+
+import org.apache.cassandra.cli.CliMain;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.AuthenticationException;
+import org.apache.cassandra.thrift.AuthorizationException;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.TimedOutException;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ThriftColumnFamilyTest extends PigTestBase
+{
+ private static String[] statements = {
+ "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
+ " strategy_options={replication_factor:1};",
+ "use thriftKs;",
+
+ "create column family SomeApp " +
+ " with comparator = UTF8Type " +
+ " and default_validation_class = UTF8Type " +
+ " and key_validation_class = UTF8Type " +
+ " and column_metadata = [{column_name: name, validation_class: UTF8Type, index_type: KEYS}, " +
+ "{column_name: vote_type, validation_class: UTF8Type}, " +
+ "{column_name: rating, validation_class: Int32Type}, " +
+ "{column_name: score, validation_class: LongType}, " +
+ "{column_name: percent, validation_class: FloatType}, " +
+ "{column_name: atomic_weight, validation_class: DoubleType}, " +
+ "{column_name: created, validation_class: DateType},]; ",
+
+ "create column family CopyOfSomeApp " +
+ "with key_validation_class = UTF8Type " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = UTF8Type " +
+ "and column_metadata = " +
+ "[ " +
+ "{column_name: name, validation_class: UTF8Type, index_type: KEYS}, " +
+ "{column_name: vote_type, validation_class: UTF8Type}, " +
+ "{column_name: rating, validation_class: Int32Type}, " +
+ "{column_name: score, validation_class: LongType}, " +
+ "{column_name: percent, validation_class: FloatType}, " +
+ "{column_name: atomic_weight, validation_class: DoubleType}, " +
+ "{column_name: created, validation_class: DateType}, " +
+ "];",
+
+ "set SomeApp['foo']['name'] = 'User Foo';",
+ "set SomeApp['foo']['vote_type'] = 'like';",
+ "set SomeApp['foo']['rating'] = 8;",
+ "set SomeApp['foo']['score'] = 125000;",
+ "set SomeApp['foo']['percent'] = '85.0';",
+ "set SomeApp['foo']['atomic_weight'] = '2.7182818284590451';",
+ "set SomeApp['foo']['created'] = 1335890877;",
+
+ "set SomeApp['bar']['name'] = 'User Bar';",
+ "set SomeApp['bar']['vote_type'] = 'like';",
+ "set SomeApp['bar']['rating'] = 9;",
+ "set SomeApp['bar']['score'] = 15000;",
+ "set SomeApp['bar']['percent'] = '35.0';",
+ "set SomeApp['bar']['atomic_weight'] = '3.1415926535897931';",
+ "set SomeApp['bar']['created'] = 1335890877;",
+
+ "set SomeApp['baz']['name'] = 'User Baz';",
+ "set SomeApp['baz']['vote_type'] = 'dislike';",
+ "set SomeApp['baz']['rating'] = 3;",
+ "set SomeApp['baz']['score'] = 512000;",
+ "set SomeApp['baz']['percent'] = '95.3';",
+ "set SomeApp['baz']['atomic_weight'] = '1.61803399';",
+ "set SomeApp['baz']['created'] = 1335890877;",
+ "set SomeApp['baz']['extra1'] = 'extra1';",
+ "set SomeApp['baz']['extra2'] = 'extra2';",
+ "set SomeApp['baz']['extra3'] = 'extra3';",
+
+ "set SomeApp['qux']['name'] = 'User Qux';",
+ "set SomeApp['qux']['vote_type'] = 'dislike';",
+ "set SomeApp['qux']['rating'] = 2;",
+ "set SomeApp['qux']['score'] = 12000;",
+ "set SomeApp['qux']['percent'] = '64.7';",
+ "set SomeApp['qux']['atomic_weight'] = '0.660161815846869';",
+ "set SomeApp['qux']['created'] = 1335890877;",
+ "set SomeApp['qux']['extra1'] = 'extra1';",
+ "set SomeApp['qux']['extra2'] = 'extra2';",
+ "set SomeApp['qux']['extra3'] = 'extra3';",
+ "set SomeApp['qux']['extra4'] = 'extra4';",
+ "set SomeApp['qux']['extra5'] = 'extra5';",
+ "set SomeApp['qux']['extra6'] = 'extra6';",
+ "set SomeApp['qux']['extra7'] = 'extra7';",
+
+ "create column family U8 with " +
+ "key_validation_class = UTF8Type and " +
+ "comparator = UTF8Type;",
+
+ "create column family Bytes with " +
+ "key_validation_class = BytesType and " +
+ "comparator = UTF8Type;",
+
+ "set U8['foo']['x'] = ascii('Z');",
+ "set Bytes[ascii('foo')]['x'] = ascii('Z');",
+
+ "create column family CC with " +
+ "key_validation_class = UTF8Type and " +
+ "default_validation_class=CounterColumnType " +
+ "and comparator=UTF8Type;",
+
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['kick'];",
+ "incr CC['chuck']['fist'];",
+
+ "create column family Compo " +
+ "with key_validation_class = UTF8Type " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = 'CompositeType(UTF8Type,UTF8Type)';",
+
+ "set Compo['punch']['bruce:lee'] = 'ouch';",
+ "set Compo['punch']['bruce:bruce'] = 'hunh?';",
+ "set Compo['kick']['bruce:lee'] = 'oww';",
+ "set Compo['kick']['bruce:bruce'] = 'watch it, mate';",
+
+ "create column family CompoInt " +
+ "with key_validation_class = UTF8Type " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = 'CompositeType(LongType,LongType)';",
+
+ "set CompoInt['clock']['1:0'] = 'z';",
+ "set CompoInt['clock']['1:30'] = 'zzzz';",
+ "set CompoInt['clock']['2:30'] = 'daddy?';",
+ "set CompoInt['clock']['6:30'] = 'coffee...';",
+
+ "create column family CompoIntCopy " +
+ "with key_validation_class = UTF8Type " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = 'CompositeType(LongType,LongType)';",
+
+ "create column family CompoKey " +
+ "with key_validation_class = 'CompositeType(UTF8Type,LongType)' " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = LongType;",
+
+ "set CompoKey['clock:10']['1'] = 'z';",
+ "set CompoKey['clock:20']['1'] = 'zzzz';",
+ "set CompoKey['clock:30']['2'] = 'daddy?';",
+ "set CompoKey['clock:40']['6'] = 'coffee...';",
+
+ "create column family CompoKeyCopy " +
+ "with key_validation_class = 'CompositeType(UTF8Type,LongType)' " +
+ "and default_validation_class = UTF8Type " +
+ "and comparator = LongType;"
+ };
+
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
+ AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
+ {
+ startCassandra();
+ setupDataByCli(statements);
+ startHadoopCluster();
+ }
+
+ @Test
+ public void testCqlStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //regular thrift column families
+ pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
+
+ //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
+ //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
+ //(foo,2.718281828459045,1335890877,User Foo,85.0,8,125000,like)
+ //(qux,0.660161815846869,1335890877,User Qux,64.7,2,12000,dislike)
+
+ //{key: chararray,atomic_weight: double,created: long,name: chararray,percent: float,rating: int,score: long,vote_type: chararray}
+ Iterator<Tuple> it = pig.openIterator("data");
+ int count = 0;
+ while (it.hasNext()) {
+ count ++;
+ Tuple t = it.next();
+ if (count == 1)
+ {
+ Assert.assertEquals(t.get(0), "bar");
+ Assert.assertEquals(t.get(1), 3.141592653589793d);
+ Assert.assertEquals(t.get(3), "User Bar");
+ Assert.assertEquals(t.get(4), 35.0f);
+ Assert.assertEquals(t.get(5), 9);
+ Assert.assertEquals(t.get(6), 15000L);
+ Assert.assertEquals(t.get(7), "like");
+ }
+ else if (count == 2)
+ {
+ Assert.assertEquals(t.get(0), "baz");
+ Assert.assertEquals(t.get(1), 1.61803399d);
+ Assert.assertEquals(t.get(3), "User Baz");
+ Assert.assertEquals(t.get(4), 95.3f);
+ Assert.assertEquals(t.get(5), 3);
+ Assert.assertEquals(t.get(6), 512000L);
+ Assert.assertEquals(t.get(7), "dislike");
+ }else if (count == 3)
+ {
+ Assert.assertEquals(t.get(0), "foo");
+ Assert.assertEquals(t.get(1), 2.718281828459045d);
+ Assert.assertEquals(t.get(3), "User Foo");
+ Assert.assertEquals(t.get(4), 85.0f);
+ Assert.assertEquals(t.get(5), 8);
+ Assert.assertEquals(t.get(6), 125000L);
+ Assert.assertEquals(t.get(7), "like");
+ }
+ else if (count == 4)
+ {
+ Assert.assertEquals(t.get(0), "qux");
+ Assert.assertEquals(t.get(1), 0.660161815846869d);
+ Assert.assertEquals(t.get(3), "User Qux");
+ Assert.assertEquals(t.get(4), 64.7f);
+ Assert.assertEquals(t.get(5), 2);
+ Assert.assertEquals(t.get(6), 12000L);
+ Assert.assertEquals(t.get(7), "dislike");
+ }
+ }
+ Assert.assertEquals(count, 4);
+
+ //Test counter colun family
+ pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
+
+ //(chuck,fist,1)
+ //(chuck,kick,3)
+
+ // {key: chararray,column1: chararray,value: long}
+ it = pig.openIterator("cc_data");
+ count = 0;
+ while (it.hasNext()) {
+ count ++;
+ Tuple t = it.next();
+ if (count == 1)
+ {
+ Assert.assertEquals(t.get(0), "chuck");
+ Assert.assertEquals(t.get(1), "fist");
+ Assert.assertEquals(t.get(2), 1L);
+ }
+ else if (count == 2)
+ {
+ Assert.assertEquals(t.get(0), "chuck");
+ Assert.assertEquals(t.get(1), "kick");
+ Assert.assertEquals(t.get(2), 3L);
+ }
+ }
+ Assert.assertEquals(count, 2);
+
+ //Test composite column family
+ pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+
+ //(kick,bruce,bruce,watch it, mate)
+ //(kick,bruce,lee,oww)
+ //(punch,bruce,bruce,hunh?)
+ //(punch,bruce,lee,ouch)
+
+ //{key: chararray,column1: chararray,column2: chararray,value: chararray}
+ it = pig.openIterator("compo_data");
+ count = 0;
+ while (it.hasNext()) {
+ count ++;
+ Tuple t = it.next();
+ if (count == 1)
+ {
+ Assert.assertEquals(t.get(0), "kick");
+ Assert.assertEquals(t.get(1), "bruce");
+ Assert.assertEquals(t.get(2), "bruce");
+ Assert.assertEquals(t.get(3), "watch it, mate");
+ }
+ else if (count == 2)
+ {
+ Assert.assertEquals(t.get(0), "kick");
+ Assert.assertEquals(t.get(1), "bruce");
+ Assert.assertEquals(t.get(2), "lee");
+ Assert.assertEquals(t.get(3), "oww");
+ }
+ else if (count == 3)
+ {
+ Assert.assertEquals(t.get(0), "punch");
+ Assert.assertEquals(t.get(1), "bruce");
+ Assert.assertEquals(t.get(2), "bruce");
+ Assert.assertEquals(t.get(3), "hunh?");
+ }
+ else if (count == 4)
+ {
+ Assert.assertEquals(t.get(0), "punch");
+ Assert.assertEquals(t.get(1), "bruce");
+ Assert.assertEquals(t.get(2), "lee");
+ Assert.assertEquals(t.get(3), "ouch");
+ }
+ }
+ Assert.assertEquals(count, 4);
+ }
+
+ @Test
+ public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ {
+ //results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User Qux),(percent,64.7),
+ //(rating,2),(score,12000),(vote_type,dislike),{(extra1,extra1),
+ //(extra2,extra2),(extra3,extra3),
+ //(extra4,extra4),(extra5,extra5),
+ //(extra6,extra6),(extra7,extra7)})
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //schema: {key: chararray,atomic_weight: (name: chararray,value: double),created: (name: chararray,value: long),
+ //name: (name: chararray,value: chararray),percent: (name: chararray,value: float),
+ //rating: (name: chararray,value: int),score: (name: chararray,value: long),
+ //vote_type: (name: chararray,value: chararray),columns: {(name: chararray,value: chararray)}}
+ Iterator<Tuple> it = pig.openIterator("rows");
+ int count = 0;
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ String rowKey = t.get(0).toString();
+ if ("qux".equals(rowKey))
+ {
+ Tuple column = (Tuple) t.get(1);
+ Assert.assertEquals(column.get(0), "atomic_weight");
+ Assert.assertEquals(column.get(1), 0.660161815846869d);
+ column = (Tuple) t.get(3);
+ Assert.assertEquals(column.get(0), "name");
+ Assert.assertEquals(column.get(1), "User Qux");
+ column = (Tuple) t.get(4);
+ Assert.assertEquals(column.get(0), "percent");
+ Assert.assertEquals(column.get(1), 64.7f);
+ column = (Tuple) t.get(5);
+ Assert.assertEquals(column.get(0), "rating");
+ Assert.assertEquals(column.get(1), 2);
+ column = (Tuple) t.get(6);
+ Assert.assertEquals(column.get(0), "score");
+ Assert.assertEquals(column.get(1), 12000L);
+ column = (Tuple) t.get(7);
+ Assert.assertEquals(column.get(0), "vote_type");
+ Assert.assertEquals(column.get(1), "dislike");
+ DataBag columns = (DataBag) t.get(8);
+ Iterator<Tuple> iter = columns.iterator();
+ int i = 0;
+ while(iter.hasNext())
+ {
+ i++;
+ column = iter.next();
+ Assert.assertEquals(column.get(0), "extra"+i);
+ }
+ Assert.assertEquals(7, columns.size());
+ }
+
+ }
+ }
+
+ @Test
+ public void testCassandraStorageFullCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ pig.setBatchOn();
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ //full copy
+ pig.registerQuery("STORE rows INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.executeBatch();
+ Assert.assertEquals("User Qux", getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type"));
+ Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
+ Assert.assertEquals("64.7", getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType"));
+ }
+
+ @Test
+ public void testCassandraStorageSigleTupleCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ pig.setBatchOn();
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ //sigle tuple
+ pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;");
+ pig.registerQuery("STORE onecol INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.executeBatch();
+ String value = null;
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ Assert.assertEquals("64.7", getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType"));
+ }
+
+ @Test
+ public void testCassandraStorageBagOnlyCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ pig.setBatchOn();
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ //bag only
+ pig.registerQuery("other = FOREACH rows GENERATE key, columns;");
+ pig.registerQuery("STORE other INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.executeBatch();
+ String value = null;
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ Assert.assertEquals("extra1", getColumnValue("thriftKs", "CopyOfSomeApp", "extra1", "qux", "UTF8Type"));
+ }
+
+ @Test
+ public void testCassandraStorageFilter() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ pig.setBatchOn();
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //filter
+ pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5;");
+ pig.registerQuery("STORE likes INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.executeBatch();
+
+ Assert.assertEquals("like", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "bar", "UTF8Type"));
+ Assert.assertEquals("like", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "foo", "UTF8Type"));
+ String value = null;
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "baz", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+
+ createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+ pig.setBatchOn();
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;");
+ pig.registerQuery("STORE dislikes_extras INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
+ pig.registerQuery("visible = FILTER rows BY COUNT(columns) == 0;");
+ pig.executeBatch();
+ Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "baz", "UTF8Type"));
+ Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
+ value = null;
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "bar", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ try
+ {
+ value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "foo", "UTF8Type");
+ }
+ catch (NotFoundException e)
+ {
+ Assert.assertTrue(true);
+ }
+ if (value != null)
+ Assert.fail();
+ }
+
+ @Test
+ public void testCassandraStorageJoin() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //test key types with a join
+ pig.registerQuery("U8 = load 'cassandra://thriftKs/U8?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("Bytes = load 'cassandra://thriftKs/Bytes?" + defaultParameters + "' using CassandraStorage();");
+
+ //cast key to chararray
+ pig.registerQuery("b = foreach Bytes generate (chararray)key, columns;");
+
+ //key in Bytes is a bytearray, U8 chararray
+ //(foo,{(x,Z)},foo,{(x,Z)})
+ pig.registerQuery("a = join Bytes by key, U8 by key;");
+ Iterator<Tuple> it = pig.openIterator("a");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), new DataByteArray("foo".getBytes()));
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), "x");
+ Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
+ String column = (String) t.get(2);
+ Assert.assertEquals(column, "foo");
+ columns = (DataBag) t.get(3);
+ iter = columns.iterator();
+ Tuple t2 = iter.next();
+ Assert.assertEquals(t2.get(0), "x");
+ Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+ }
+ //key should now be cast into a chararray
+ //(foo,{(x,Z)},foo,{(x,Z)})
+ pig.registerQuery("c = join b by (chararray)key, U8 by (chararray)key;");
+ it = pig.openIterator("c");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "foo");
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), "x");
+ Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
+ String column = (String) t.get(2);
+ Assert.assertEquals(column, "foo");
+ columns = (DataBag) t.get(3);
+ iter = columns.iterator();
+ Tuple t2 = iter.next();
+ Assert.assertEquals(t2.get(0), "x");
+ Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+ }
+ }
+
+ @Test
+ public void testCassandraStorageCounterCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
+
+ //Test counter column family support
+ pig.registerQuery("CC = load 'cassandra://thriftKs/CC?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("total_hits = foreach CC generate key, SUM(columns.value);");
+ //(chuck,4)
+ Iterator<Tuple> it = pig.openIterator("total_hits");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "chuck");
+ Assert.assertEquals(t.get(1), 4l);
+ }
+ }
+
+ @Test
+ public void testCassandraStorageCompositeColumnCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //Test CompositeType
+ pig.registerQuery("compo = load 'cassandra://thriftKs/Compo?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("compo = foreach compo generate key as method, flatten(columns);");
+ pig.registerQuery("lee = filter compo by columns::name == ('bruce','lee');");
+
+ //(kick,(bruce,lee),oww)
+ //(punch,(bruce,lee),ouch)
+ Iterator<Tuple> it = pig.openIterator("lee");
+ int count = 0;
+ while (it.hasNext()) {
+ count ++;
+ Tuple t = it.next();
+ if (count == 1)
+ Assert.assertEquals(t.get(0), "kick");
+ else
+ Assert.assertEquals(t.get(0), "punch");
+ Tuple t1 = (Tuple) t.get(1);
+ Assert.assertEquals(t1.get(0), "bruce");
+ Assert.assertEquals(t1.get(1), "lee");
+ if (count == 1)
+ Assert.assertEquals(t.get(2), "oww");
+ else
+ Assert.assertEquals(t.get(2), "ouch");
+ }
+ Assert.assertEquals(count, 2);
+ pig.registerQuery("night = load 'cassandra://thriftKs/CompoInt?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("night = foreach night generate flatten(columns);");
+ pig.registerQuery("night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;");
+
+ //What happens at the darkest hour?
+ pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;");
+
+ //(2.5,daddy?)
+ it = pig.openIterator("darkest");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), 2.5d);
+ Assert.assertEquals(t.get(1), "daddy?");
+ }
+ pig.setBatchOn();
+ pig.registerQuery("compo_int_rows = LOAD 'cassandra://thriftKs/CompoInt?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("STORE compo_int_rows INTO 'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using CassandraStorage();");
+ pig.executeBatch();
+ pig.registerQuery("compocopy_int_rows = LOAD 'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using CassandraStorage();");
+ //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)})
+ it = pig.openIterator("compocopy_int_rows");
+ count = 0;
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), "clock");
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ while (iter.hasNext())
+ {
+ count ++;
+ Tuple t1 = iter.next();
+ Tuple inner = (Tuple) t1.get(0);
+ if (count == 1)
+ {
+ Assert.assertEquals(inner.get(0), 1L);
+ Assert.assertEquals(inner.get(1), 0L);
+ Assert.assertEquals(t1.get(1), "z");
+ }
+ else if (count == 2)
+ {
+ Assert.assertEquals(inner.get(0), 1L);
+ Assert.assertEquals(inner.get(1), 30L);
+ Assert.assertEquals(t1.get(1), "zzzz");
+ }
+ else if (count == 3)
+ {
+ Assert.assertEquals(inner.get(0), 2L);
+ Assert.assertEquals(inner.get(1), 30L);
+ Assert.assertEquals(t1.get(1), "daddy?");
+ }
+ else if (count == 4)
+ {
+ Assert.assertEquals(inner.get(0), 6L);
+ Assert.assertEquals(inner.get(1), 30L);
+ Assert.assertEquals(t1.get(1), "coffee...");
+ }
+ }
+ Assert.assertEquals(count, 4);
+ }
+ }
+
+ @Test
+ public void testCassandraStorageCompositeKeyCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
+ {
+ //Test CompositeKey
+ pig.registerQuery("compokeys = load 'cassandra://thriftKs/CompoKey?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;");
+ //((clock,40),{(6,coffee...)})
+ Iterator<Tuple> it = pig.openIterator("compokeys");
+ if (it.hasNext()) {
+ Tuple t = it.next();
+ Tuple key = (Tuple) t.get(0);
+ Assert.assertEquals(key.get(0), "clock");
+ Assert.assertEquals(key.get(1), 40L);
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if (iter.hasNext())
+ {
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), 6L);
+ Assert.assertEquals(t1.get(1), "coffee...");
+ }
+ }
+ pig.setBatchOn();
+ pig.registerQuery("compo_key_rows = LOAD 'cassandra://thriftKs/CompoKey?" + defaultParameters + "' using CassandraStorage();");
+ pig.registerQuery("STORE compo_key_rows INTO 'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using CassandraStorage();");
+ pig.executeBatch();
+ pig.registerQuery("compo_key_copy_rows = LOAD 'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using CassandraStorage();");
+ //((clock,10),{(1,z)})
+ //((clock,20),{(1,zzzz)})
+ //((clock,30),{(2,daddy?)})
+ //((clock,40),{(6,coffee...)})
+ it = pig.openIterator("compo_key_copy_rows");
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ count ++;
+ if (count == 1)
+ {
+ Tuple key = (Tuple) t.get(0);
+ Assert.assertEquals(key.get(0), "clock");
+ Assert.assertEquals(key.get(1), 10L);
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if (iter.hasNext())
+ {
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), 1L);
+ Assert.assertEquals(t1.get(1), "z");
+ }
+ }
+ else if (count == 2)
+ {
+ Tuple key = (Tuple) t.get(0);
+ Assert.assertEquals(key.get(0), "clock");
+ Assert.assertEquals(key.get(1), 20L);
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if (iter.hasNext())
+ {
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), 1L);
+ Assert.assertEquals(t1.get(1), "zzzz");
+ }
+ }
+ else if (count == 3)
+ {
+ Tuple key = (Tuple) t.get(0);
+ Assert.assertEquals(key.get(0), "clock");
+ Assert.assertEquals(key.get(1), 30L);
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if (iter.hasNext())
+ {
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), 2L);
+ Assert.assertEquals(t1.get(1), "daddy?");
+ }
+ }
+ else if (count == 4)
+ {
+ Tuple key = (Tuple) t.get(0);
+ Assert.assertEquals(key.get(0), "clock");
+ Assert.assertEquals(key.get(1), 40L);
+ DataBag columns = (DataBag) t.get(1);
+ Iterator<Tuple> iter = columns.iterator();
+ if (iter.hasNext())
+ {
+ Tuple t1 = iter.next();
+ Assert.assertEquals(t1.get(0), 6L);
+ Assert.assertEquals(t1.get(1), "coffee...");
+ }
+ }
+ }
+ Assert.assertEquals(count, 4);
+ }
+
+ private String getColumnValue(String ks, String cf, String colName, String key, String validator)
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, IOException
+ {
+ Cassandra.Client client = getClient();
+ client.set_keyspace(ks);
+
+ ByteBuffer key_user_id = ByteBufferUtil.bytes(key);
+
+ long timestamp = System.currentTimeMillis();
+ ColumnPath cp = new ColumnPath(cf);
+ ColumnParent par = new ColumnParent(cf);
+ cp.column = ByteBufferUtil.bytes(colName);
+
+ // read
+ ColumnOrSuperColumn got = client.get(key_user_id, cp, ConsistencyLevel.ONE);
+ return parseType(validator).getString(got.getColumn().value);
+ }
+
+ private void createColumnFamily(String ks, String cf, String statement) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ {
+ CliMain.connect("127.0.0.1", 9170);
+ try
+ {
+ CliMain.processStatement("use " + ks + ";");
+ CliMain.processStatement("drop column family " + cf + ";");
+ }
+ catch (Exception e)
+ {
+ }
+ CliMain.processStatement(statement);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/pig/test/MiniCluster.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/pig/test/MiniCluster.java b/test/pig/org/apache/pig/test/MiniCluster.java
new file mode 100644
index 0000000..3216392
--- /dev/null
+++ b/test/pig/org/apache/pig/test/MiniCluster.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class MiniCluster extends MiniGenericCluster {
+ private MiniMRCluster m_mr = null;
+ public MiniCluster() {
+ super();
+ }
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ try {
+ System.setProperty("hadoop.log.dir", "build/test/logs");
+ final int dataNodes = 4; // There will be 4 data nodes
+ final int taskTrackers = 4; // There will be 4 task tracker nodes
+
+ // Create the configuration hadoop-site.xml file
+ File conf_dir = new File("build/classes/");
+ conf_dir.mkdirs();
+ File conf_file = new File(conf_dir, "hadoop-site.xml");
+
+ conf_file.delete();
+
+ // Builds and starts the mini dfs and mapreduce clusters
+ Configuration config = new Configuration();
+ m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+ m_fileSys = m_dfs.getFileSystem();
+ m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
+
+ // Write the necessary config info to hadoop-site.xml
+ m_conf = m_mr.createJobConf();
+ m_conf.setInt("mapred.submit.replication", 2);
+ m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+ m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+ m_conf.set("mapred.map.max.attempts", "2");
+ m_conf.set("mapred.reduce.max.attempts", "2");
+ m_conf.set("pig.jobcontrol.sleep", "100");
+ m_conf.writeXml(new FileOutputStream(conf_file));
+
+ // Set the system properties needed by Pig
+ System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+ System.setProperty("namenode", m_conf.get("fs.default.name"));
+ System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ if (m_mr != null) { m_mr.shutdown(); }
+ m_mr = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/pig/test/MiniGenericCluster.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/pig/test/MiniGenericCluster.java b/test/pig/org/apache/pig/test/MiniGenericCluster.java
new file mode 100644
index 0000000..ac3f5bc
--- /dev/null
+++ b/test/pig/org/apache/pig/test/MiniGenericCluster.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.Properties;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+/**
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
+ * environment for Pig to run on top of the mini cluster.
+ *
+ * This class is the base class for MiniCluster, which has slightly
+ * difference among different versions of hadoop. MiniCluster implementation
+ * is located in $PIG_HOME/shims.
+ */
+abstract public class MiniGenericCluster {
+ protected MiniDFSCluster m_dfs = null;
+ protected FileSystem m_fileSys = null;
+ protected Configuration m_conf = null;
+
+ protected final static MiniCluster INSTANCE = new MiniCluster();
+ protected static boolean isSetup = true;
+
+ protected MiniGenericCluster() {
+ setupMiniDfsAndMrClusters();
+ }
+
+ abstract protected void setupMiniDfsAndMrClusters();
+
+ /**
+ * Returns the single instance of class MiniClusterBuilder that
+ * represents the resouces for a mini dfs cluster and a mini
+ * mapreduce cluster.
+ */
+ public static MiniCluster buildCluster() {
+ if(! isSetup){
+ INSTANCE.setupMiniDfsAndMrClusters();
+ isSetup = true;
+ }
+ return INSTANCE;
+ }
+
+ public void shutDown(){
+ INSTANCE.shutdownMiniDfsAndMrClusters();
+ }
+
+ protected void finalize() {
+ shutdownMiniDfsAndMrClusters();
+ }
+
+ protected void shutdownMiniDfsAndMrClusters() {
+ isSetup = false;
+ shutdownMiniDfsClusters();
+ shutdownMiniMrClusters();
+ }
+
+ protected void shutdownMiniDfsClusters() {
+ try {
+ if (m_fileSys != null) { m_fileSys.close(); }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (m_dfs != null) { m_dfs.shutdown(); }
+ m_fileSys = null;
+ m_dfs = null;
+ }
+
+ abstract protected void shutdownMiniMrClusters();
+
+ public Properties getProperties() {
+ errorIfNotSetup();
+ return ConfigurationUtil.toProperties(m_conf);
+ }
+
+ public Configuration getConfiguration() {
+ return new Configuration(m_conf);
+ }
+
+ public void setProperty(String name, String value) {
+ errorIfNotSetup();
+ m_conf.set(name, value);
+ }
+
+ public FileSystem getFileSystem() {
+ errorIfNotSetup();
+ return m_fileSys;
+ }
+
+ /**
+ * Throw RunTimeException if isSetup is false
+ */
+ private void errorIfNotSetup(){
+ if(isSetup)
+ return;
+ String msg = "function called on MiniCluster that has been shutdown";
+ throw new RuntimeException(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
deleted file mode 100644
index 1ae9806..0000000
--- a/test/unit/org/apache/cassandra/pig/CqlTableDataTypeTest.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.pig;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.thrift.AuthorizationException;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.cassandra.thrift.SchemaDisagreementException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.Hex;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CqlTableDataTypeTest extends PigTestBase
-{
- //ASCII (AsciiType.instance),
- //BIGINT (LongType.instance),
- //BLOB (BytesType.instance),
- //BOOLEAN (BooleanType.instance),
- //COUNTER (CounterColumnType.instance),
- //DECIMAL (DecimalType.instance),
- //DOUBLE (DoubleType.instance),
- //FLOAT (FloatType.instance),
- //INET (InetAddressType.instance),
- //INT (Int32Type.instance),
- //TEXT (UTF8Type.instance),
- //TIMESTAMP(DateType.instance),
- //UUID (UUIDType.instance),
- //VARCHAR (UTF8Type.instance),
- //VARINT (IntegerType.instance),
- //TIMEUUID (TimeUUIDType.instance);
- //SET
- //LIST
- //MAP
- //Create table to test the above data types
- private static String[] statements = {
- "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
- "USE cql3ks;",
-
- "CREATE TABLE cqltable (" +
- "key int primary key," +
- "col_ascii ascii," +
- "col_bigint bigint," +
- "col_blob blob," +
- "col_boolean boolean," +
- "col_decimal decimal," +
- "col_double double," +
- "col_float float," +
- "col_inet inet," +
- "col_int int," +
- "col_text text," +
- "col_timestamp timestamp," +
- "col_uuid uuid," +
- "col_varchar varchar," +
- "col_varint varint," +
- "col_timeuuid timeuuid);",
-
- "CREATE TABLE settable (" +
- "key int primary key," +
- "col_set_ascii set<ascii>," +
- "col_set_bigint set<bigint>," +
- "col_set_blob set<blob>," +
- "col_set_boolean set<boolean>," +
- "col_set_decimal set<decimal>," +
- "col_set_double set<double>," +
- "col_set_float set<float>," +
- "col_set_inet set<inet>," +
- "col_set_int set<int>," +
- "col_set_text set<text>," +
- "col_set_timestamp set<timestamp>," +
- "col_set_uuid set<uuid>," +
- "col_set_varchar set<varchar>," +
- "col_set_varint set<varint>," +
- "col_set_timeuuid set<timeuuid>);",
-
- "CREATE TABLE listtable (" +
- "key int primary key," +
- "col_list_ascii list<ascii>," +
- "col_list_bigint list<bigint>," +
- "col_list_blob list<blob>," +
- "col_list_boolean list<boolean>," +
- "col_list_decimal list<decimal>," +
- "col_list_double list<double>," +
- "col_list_float list<float>," +
- "col_list_inet list<inet>," +
- "col_list_int list<int>," +
- "col_list_text list<text>," +
- "col_list_timestamp list<timestamp>," +
- "col_list_uuid list<uuid>," +
- "col_list_varchar list<varchar>," +
- "col_list_varint list<varint>," +
- "col_list_timeuuid list<timeuuid>);",
-
- "CREATE TABLE maptable (" +
- "key int primary key," +
- "col_map_ascii map<ascii, ascii>," +
- "col_map_bigint map<bigint, bigint>," +
- "col_map_blob map<blob, blob>," +
- "col_map_boolean map<boolean, boolean>," +
- "col_map_decimal map<decimal, decimal>," +
- "col_map_double map<double, double>," +
- "col_map_float map<float, float>," +
- "col_map_inet map<inet, inet>," +
- "col_map_int map<int, int>," +
- "col_map_text map<text, text>," +
- "col_map_timestamp map<timestamp, timestamp>," +
- "col_map_uuid map<uuid, uuid>," +
- "col_map_varchar map<varchar, varchar>," +
- "col_map_varint map<varint, varint>," +
- "col_map_timeuuid map<timeuuid, timeuuid>);",
-
- "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');",
- "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);",
- "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);",
- "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);",
- "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);",
- "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);",
- "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);",
- "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');",
- "INSERT INTO cqltable(key, col_int) VALUES (1, 123);",
- "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');",
- "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');",
- "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));",
- "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);",
- "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');",
- "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);",
-
- "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});",
- "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});",
- "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});",
- "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});",
- "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});",
- "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});",
- "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});",
- "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});",
- "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});",
- "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});",
- "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});",
- "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
- "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});",
- "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});",
- "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});",
-
- "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);",
- "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);",
- "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);",
- "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);",
- "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);",
- "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);",
- "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);",
- "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);",
- "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);",
- "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);",
- "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);",
- "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);",
- "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);",
- "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);",
- "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);",
-
- "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});",
- "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});",
- "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});",
- "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});",
- "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});",
- "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});",
- "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});",
- "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});",
- "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});",
- "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});",
- "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});",
- "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});",
- "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});",
- "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});",
- "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});",
-
- "CREATE TABLE countertable (key int primary key, col_counter counter);",
- "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;",
- };
-
- @BeforeClass
- public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
- AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
- {
- startCassandra();
- setupDataByCql(statements);
- startHadoopCluster();
- }
-
- @Test
- public void testCqlStorageRegularType()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("rows");
- //{key: int,
- //col_ascii: chararray,
- //col_bigint: long,
- //col_blob: bytearray,
- //col_boolean: bytearray,
- //col_decimal: chararray,
- //col_double: double,
- //col_float: float,
- //col_inet: chararray,
- //col_int: int,
- //col_text: chararray,
- //col_timestamp: long,
- //col_timeuuid: bytearray,
- //col_uuid: chararray,
- //col_varchar: chararray,
- //col_varint: int}
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 1);
- Assert.assertEquals(t.get(1), "ascii");
- Assert.assertEquals(t.get(2), 12345678L);
- Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f")));
- Assert.assertEquals(t.get(4), false);
- Assert.assertEquals(t.get(5), "23.4567");
- Assert.assertEquals(t.get(6), 12345678.12345678d);
- Assert.assertEquals(t.get(7), 123.12f);
- Assert.assertEquals(t.get(8), "127.0.0.1");
- Assert.assertEquals(t.get(9), 123);
- Assert.assertEquals(t.get(10), "text");
- Assert.assertEquals(t.get(11), 1296705900000L);
- Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
- Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
- Assert.assertEquals(t.get(14), "varchar");
- Assert.assertEquals(t.get(15), 123);
- }
-
- pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();");
- it = pig.openIterator("cc_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 1);
- Assert.assertEquals(t.get(1), 3L);
- }
- }
-
- @Test
- public void testCqlStorageSetType()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("set_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 1);
- Tuple innerTuple = (Tuple) t.get(1);
- Assert.assertEquals(innerTuple.get(0), "ascii1");
- Assert.assertEquals(innerTuple.get(1), "ascii2");
- innerTuple = (Tuple) t.get(2);
- Assert.assertEquals(innerTuple.get(0), 12345678L);
- Assert.assertEquals(innerTuple.get(1), 12345679L);
- innerTuple = (Tuple) t.get(3);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
- innerTuple = (Tuple) t.get(4);
- Assert.assertEquals(innerTuple.get(0), false);
- Assert.assertEquals(innerTuple.get(1), true);
- innerTuple = (Tuple) t.get(5);
- Assert.assertEquals(innerTuple.get(0), "23.4567");
- Assert.assertEquals(innerTuple.get(1), "23.4568");
- innerTuple = (Tuple) t.get(6);
- Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
- Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
- innerTuple = (Tuple) t.get(7);
- Assert.assertEquals(innerTuple.get(0), 123.12f);
- Assert.assertEquals(innerTuple.get(1), 123.13f);
- innerTuple = (Tuple) t.get(8);
- Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
- Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
- innerTuple = (Tuple) t.get(9);
- Assert.assertEquals(innerTuple.get(0), 123);
- Assert.assertEquals(innerTuple.get(1), 124);
- innerTuple = (Tuple) t.get(10);
- Assert.assertEquals(innerTuple.get(0), "text1");
- Assert.assertEquals(innerTuple.get(1), "text2");
- innerTuple = (Tuple) t.get(11);
- Assert.assertEquals(innerTuple.get(0), 1296705900000L);
- Assert.assertEquals(innerTuple.get(1), 1296792300000L);
- innerTuple = (Tuple) t.get(12);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
- innerTuple = (Tuple) t.get(13);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
- innerTuple = (Tuple) t.get(14);
- Assert.assertEquals(innerTuple.get(0), "varchar1");
- Assert.assertEquals(innerTuple.get(1), "varchar2");
- innerTuple = (Tuple) t.get(15);
- Assert.assertEquals(innerTuple.get(0), 123);
- Assert.assertEquals(innerTuple.get(1), 124);
- }
- }
-
- @Test
- public void testCqlStorageListType()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("list_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 1);
- Tuple innerTuple = (Tuple) t.get(1);
- Assert.assertEquals(innerTuple.get(1), "ascii1");
- Assert.assertEquals(innerTuple.get(0), "ascii2");
- innerTuple = (Tuple) t.get(2);
- Assert.assertEquals(innerTuple.get(1), 12345678L);
- Assert.assertEquals(innerTuple.get(0), 12345679L);
- innerTuple = (Tuple) t.get(3);
- Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
- Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
- innerTuple = (Tuple) t.get(4);
- Assert.assertEquals(innerTuple.get(1), false);
- Assert.assertEquals(innerTuple.get(0), true);
- innerTuple = (Tuple) t.get(5);
- Assert.assertEquals(innerTuple.get(1), "23.4567");
- Assert.assertEquals(innerTuple.get(0), "23.4568");
- innerTuple = (Tuple) t.get(6);
- Assert.assertEquals(innerTuple.get(1), 12345678.12345678d);
- Assert.assertEquals(innerTuple.get(0), 12345678.12345679d);
- innerTuple = (Tuple) t.get(7);
- Assert.assertEquals(innerTuple.get(1), 123.12f);
- Assert.assertEquals(innerTuple.get(0), 123.13f);
- innerTuple = (Tuple) t.get(8);
- Assert.assertEquals(innerTuple.get(1), "127.0.0.1");
- Assert.assertEquals(innerTuple.get(0), "127.0.0.2");
- innerTuple = (Tuple) t.get(9);
- Assert.assertEquals(innerTuple.get(1), 123);
- Assert.assertEquals(innerTuple.get(0), 124);
- innerTuple = (Tuple) t.get(10);
- Assert.assertEquals(innerTuple.get(1), "text1");
- Assert.assertEquals(innerTuple.get(0), "text2");
- innerTuple = (Tuple) t.get(11);
- Assert.assertEquals(innerTuple.get(1), 1296705900000L);
- Assert.assertEquals(innerTuple.get(0), 1296792300000L);
- innerTuple = (Tuple) t.get(12);
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
- innerTuple = (Tuple) t.get(13);
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
- innerTuple = (Tuple) t.get(14);
- Assert.assertEquals(innerTuple.get(1), "varchar1");
- Assert.assertEquals(innerTuple.get(0), "varchar2");
- innerTuple = (Tuple) t.get(15);
- Assert.assertEquals(innerTuple.get(1), 123);
- Assert.assertEquals(innerTuple.get(0), 124);
- }
- }
-
- @Test
- public void testCqlStorageMapType()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("map_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 1);
- Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0);
- Assert.assertEquals(innerTuple.get(0), "ascii1");
- Assert.assertEquals(innerTuple.get(1), "ascii2");
- innerTuple = (Tuple) ((Tuple) t.get(2)).get(0);
- Assert.assertEquals(innerTuple.get(0), 12345678L);
- Assert.assertEquals(innerTuple.get(1), 12345679L);
- innerTuple = (Tuple) ((Tuple) t.get(3)).get(0);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f")));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e")));
- innerTuple = (Tuple) ((Tuple) t.get(4)).get(0);
- Assert.assertEquals(innerTuple.get(0), false);
- Assert.assertEquals(innerTuple.get(1), true);
- innerTuple = (Tuple) ((Tuple) t.get(5)).get(0);
- Assert.assertEquals(innerTuple.get(0), "23.4567");
- Assert.assertEquals(innerTuple.get(1), "23.4568");
- innerTuple = (Tuple) ((Tuple) t.get(6)).get(0);
- Assert.assertEquals(innerTuple.get(0), 12345678.12345678d);
- Assert.assertEquals(innerTuple.get(1), 12345678.12345679d);
- innerTuple = (Tuple) ((Tuple) t.get(7)).get(0);
- Assert.assertEquals(innerTuple.get(0), 123.12f);
- Assert.assertEquals(innerTuple.get(1), 123.13f);
- innerTuple = (Tuple) ((Tuple) t.get(8)).get(0);
- Assert.assertEquals(innerTuple.get(0), "127.0.0.1");
- Assert.assertEquals(innerTuple.get(1), "127.0.0.2");
- innerTuple = (Tuple) ((Tuple) t.get(9)).get(0);
- Assert.assertEquals(innerTuple.get(0), 123);
- Assert.assertEquals(innerTuple.get(1), 124);
- innerTuple = (Tuple) ((Tuple) t.get(10)).get(0);
- Assert.assertEquals(innerTuple.get(0), "text1");
- Assert.assertEquals(innerTuple.get(1), "text2");
- innerTuple = (Tuple) ((Tuple) t.get(11)).get(0);
- Assert.assertEquals(innerTuple.get(0), 1296705900000L);
- Assert.assertEquals(innerTuple.get(1), 1296792300000L);
- innerTuple = (Tuple) ((Tuple) t.get(12)).get(0);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
- innerTuple = (Tuple) ((Tuple) t.get(13)).get(0);
- Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
- Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array())));
- innerTuple = (Tuple) ((Tuple) t.get(14)).get(0);
- Assert.assertEquals(innerTuple.get(0), "varchar1");
- Assert.assertEquals(innerTuple.get(1), "varchar2");
- innerTuple = (Tuple) ((Tuple) t.get(15)).get(0);
- Assert.assertEquals(innerTuple.get(0), 123);
- Assert.assertEquals(innerTuple.get(1), 124);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/CqlTableTest.java b/test/unit/org/apache/cassandra/pig/CqlTableTest.java
deleted file mode 100644
index 785d819..0000000
--- a/test/unit/org/apache/cassandra/pig/CqlTableTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.pig;
-
-import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.util.Iterator;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.thrift.AuthorizationException;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.cassandra.thrift.SchemaDisagreementException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CqlTableTest extends PigTestBase
-{
- private static String[] statements = {
- "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
- "USE cql3ks;",
-
- "CREATE TABLE cqltable (key1 text, key2 int, column1 int, column2 float, primary key(key1, key2))",
- "INSERT INTO cqltable (key1, key2, column1, column2) values ('key1', 111, 100, 10.1)",
- "CREATE TABLE compactcqltable (key1 text, column1 int, column2 float, primary key(key1)) WITH COMPACT STORAGE",
- "INSERT INTO compactcqltable (key1, column1, column2) values ('key1', 100, 10.1)",
-
- "CREATE TABLE test (a int PRIMARY KEY, b int);",
-
- "CREATE TABLE moredata (x int PRIMARY KEY, y int);",
- "INSERT INTO test (a,b) VALUES (1,1);",
- "INSERT INTO test (a,b) VALUES (2,2);",
- "INSERT INTO test (a,b) VALUES (3,3);",
- "INSERT INTO moredata (x, y) VALUES (4,4);",
- "INSERT INTO moredata (x, y) VALUES (5,5);",
- "INSERT INTO moredata (x, y) VALUES (6,6);",
-
- "CREATE TABLE compotable (a int, b int, c text, d text, PRIMARY KEY (a,b,c));",
- "INSERT INTO compotable (a, b , c , d ) VALUES ( 1,1,'One','match');",
- "INSERT INTO compotable (a, b , c , d ) VALUES ( 2,2,'Two','match');",
- "INSERT INTO compotable (a, b , c , d ) VALUES ( 3,3,'Three','match');",
- "INSERT INTO compotable (a, b , c , d ) VALUES ( 4,4,'Four','match');",
-
- "create table compmore (id int PRIMARY KEY, x int, y int, z text, data text);",
- "INSERT INTO compmore (id, x, y, z,data) VALUES (1,5,6,'Fix','nomatch');",
- "INSERT INTO compmore (id, x, y, z,data) VALUES (2,6,5,'Sive','nomatch');",
- "INSERT INTO compmore (id, x, y, z,data) VALUES (3,7,7,'Seven','match');",
- "INSERT INTO compmore (id, x, y, z,data) VALUES (4,8,8,'Eight','match');",
- "INSERT INTO compmore (id, x, y, z,data) VALUES (5,9,10,'Ninen','nomatch');",
-
- "CREATE TABLE collectiontable(m text PRIMARY KEY, n map<text, text>);",
- "UPDATE collectiontable SET n['key1'] = 'value1' WHERE m = 'book1';",
- "UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';",
- "UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';",
- "UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';",
- };
-
- @BeforeClass
- public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
- AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
- {
- startCassandra();
- setupDataByCql(statements);
- startHadoopCluster();
- }
-
- @Test
- public void testCqlStorageSchema()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0).toString(), "key1");
- Assert.assertEquals(t.get(1), 111);
- Assert.assertEquals(t.get(2), 100);
- Assert.assertEquals(t.get(3), 10.1f);
- Assert.assertEquals(4, t.size());
- }
-
- pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();");
- it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0).toString(), "key1");
- Assert.assertEquals(t.get(1), 100);
- Assert.assertEquals(t.get(2), 10.1f);
- Assert.assertEquals(3, t.size());
- }
- }
-
- @Test
- public void testCqlStorageSingleKeyTable()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.setBatchOn();
- pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();");
- pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);");
- pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage();");
- pig.executeBatch();
- //(5,5)
- //(6,6)
- //(4,4)
- //(2,2)
- //(3,3)
- //(1,1)
- pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("result");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), t.get(1));
- }
- }
-
- @Test
- public void testCqlStorageCompositeKeyTable()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.setBatchOn();
- pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();");
- pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);");
- pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" + defaultParameters + "&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage();");
- pig.executeBatch();
-
- //(5,6,Fix,nomatch)
- //(3,3,Three,match)
- //(1,1,One,match)
- //(2,2,Two,match)
- //(7,7,Seven,match)
- //(8,8,Eight,match)
- //(6,5,Sive,nomatch)
- //(4,4,Four,match)
- //(9,10,Ninen,nomatch)
- pig.registerQuery("result= LOAD 'cql://cql3ks/compotable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("result");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(3), "match");
- }
- }
-
- @Test
- public void testCqlStorageCollectionColumnTable()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
- {
- pig.setBatchOn();
- pig.registerQuery("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
- pig.registerQuery("recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));");
- pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();");
- pig.executeBatch();
-
- //(book2,((m,mm),(n,nn)))
- //(book3,((m,mm),(n,nn)))
- //(book4,((m,mm),(n,nn)))
- //(book1,((m,mm),(n,nn)))
- pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
- Iterator<Tuple> it = pig.openIterator("result");
- if (it.hasNext()) {
- Tuple t = it.next();
- Tuple t1 = (Tuple) t.get(1);
- Assert.assertEquals(t1.size(), 2);
- Tuple element1 = (Tuple) t1.get(0);
- Tuple element2 = (Tuple) t1.get(1);
- Assert.assertEquals(element1.get(0), "m");
- Assert.assertEquals(element1.get(1), "mm");
- Assert.assertEquals(element2.get(0), "n");
- Assert.assertEquals(element2.get(1), "nn");
- }
- }
-
- @Test
- public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
- {
- //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)})
- pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters + "' USING CassandraStorage();");
-
- //schema: {key: chararray,columns: {(name: (),value: bytearray)}}
- Iterator<Tuple> it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- String rowKey = t.get(0).toString();
- Assert.assertEquals(rowKey, "key1");
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- int i = 0;
- while(iter.hasNext())
- {
- i++;
- Tuple column = (Tuple) iter.next();
- if (i==1)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "");
- Assert.assertEquals(column.get(1).toString(), "");
- }
- if (i==2)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1");
- Assert.assertEquals(column.get(1), 100);
- }
- if (i==3)
- {
- Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
- Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2");
- Assert.assertEquals(column.get(1), 10.1f);
- }
- }
- Assert.assertEquals(3, columns.size());
- }
-
- //results: (key1,(column1,100),(column2,10.1))
- pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters + "' USING CassandraStorage();");
-
- //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value: float)}
- it = pig.openIterator("compact_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- String rowKey = t.get(0).toString();
- Assert.assertEquals(rowKey, "key1");
- Tuple column = (Tuple) t.get(1);
- Assert.assertEquals(column.get(0), "column1");
- Assert.assertEquals(column.get(1), 100);
- column = (Tuple) t.get(2);
- Assert.assertEquals(column.get(0), "column2");
- Assert.assertEquals(column.get(1), 10.1f);
- }
- }
-}