You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/27 17:44:34 UTC
[4/7] move pig-test out of normal unit tests (still part of test-all)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/PigTestBase.java b/test/unit/org/apache/cassandra/pig/PigTestBase.java
deleted file mode 100644
index ea06b8c..0000000
--- a/test/unit/org/apache/cassandra/pig/PigTestBase.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.pig;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.charset.CharacterCodingException;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.cli.CliMain;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.service.EmbeddedCassandraService;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.cassandra.thrift.SchemaDisagreementException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.test.MiniCluster;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-
-public class PigTestBase extends SchemaLoader
-{
- protected static EmbeddedCassandraService cassandra;
- protected static Configuration conf;
- protected static MiniCluster cluster;
- protected static PigServer pig;
- protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
-
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
- @Before
- public void beforeTest() throws Exception {
- pig = new PigServer(new PigContext(ExecType.LOCAL, ConfigurationUtil.toProperties(conf)));
- PigContext.initializeImportList("org.apache.cassandra.hadoop.pig");
- }
-
- @After
- public void tearDown() throws Exception {
- pig.shutdown();
- }
-
- protected static Cassandra.Client getClient() throws TTransportException
- {
- TTransport tr = new TFramedTransport(new TSocket("localhost", DatabaseDescriptor.getRpcPort()));
- TProtocol proto = new TBinaryProtocol(tr);
- Cassandra.Client client = new Cassandra.Client(proto);
- tr.open();
- return client;
- }
-
- protected static void startCassandra() throws IOException
- {
- Schema.instance.clear(); // Schema are now written on disk and will be reloaded
- cassandra = new EmbeddedCassandraService();
- cassandra.start();
- }
-
- protected static void startHadoopCluster()
- {
- cluster = MiniCluster.buildCluster();
- conf = cluster.getConfiguration();
- }
-
- protected AbstractType parseType(String type) throws IOException
- {
- try
- {
- return TypeParser.parse(type);
- }
- catch (ConfigurationException e)
- {
- throw new IOException(e);
- }
- catch (SyntaxException e)
- {
- throw new IOException(e);
- }
- }
-
- protected static void setupDataByCli(String[] statements) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
- {
- // new error/output streams for CliSessionState
- ByteArrayOutputStream errStream = new ByteArrayOutputStream();
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-
- // checking if we can connect to the running cassandra node on localhost
- CliMain.connect("127.0.0.1", 9170);
-
- // setting new output stream
- CliMain.sessionState.setOut(new PrintStream(outStream));
- CliMain.sessionState.setErr(new PrintStream(errStream));
-
- // re-creating keyspace for tests
- try
- {
- // dropping in case it exists e.g. could be left from previous run
- CliMain.processStatement("drop keyspace thriftKs;");
- }
- catch (Exception e)
- {
- }
-
- for (String statement : statements)
- {
- errStream.reset();
- System.out.println("Executing statement: " + statement);
- CliMain.processStatement(statement);
- String result = outStream.toString();
- System.out.println("result: " + result);
- outStream.reset(); // reset stream so we have only output from next statement all the time
- errStream.reset(); // no errors to the end user.
- }
- }
-
- protected static void setupDataByCql(String[] statements) throws InvalidRequestException, UnavailableException, TimedOutException, TException
- {
- Cassandra.Client client = getClient();
- // re-creating keyspace for tests
- try
- {
- // dropping in case it exists e.g. could be left from previous run
- client.execute_cql3_query(ByteBufferUtil.bytes("DROP KEYSPACE cql3ks"), Compression.NONE, ConsistencyLevel.ONE);
- }
- catch (Exception e)
- {
- }
-
- for (String statement : statements)
- {
- try
- {
- System.out.println("Executing statement: " + statement);
- client.execute_cql3_query(ByteBufferUtil.bytes(statement), Compression.NONE, ConsistencyLevel.ONE);
- }
- catch (SchemaDisagreementException e)
- {
- Assert.fail();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
deleted file mode 100644
index 7bccc23..0000000
--- a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.pig;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.thrift.AuthorizationException;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.Hex;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ThriftColumnFamilyDataTypeTest extends PigTestBase
-{
- //AsciiType
- //LongType
- //BytesType
- //BooleanType
- //CounterColumnType
- //DecimalType
- //DoubleType
- //FloatType
- //InetAddressType
- //Int32Type
- //UTF8Type
- //DateType
- //UUIDType
- //IntegerType
- //TimeUUIDType
- //IntegerType
- //LexicalUUIDType
- private static String[] statements = {
- "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
- " strategy_options={replication_factor:1};",
- "use thriftKs;",
-
- "create column family SomeApp " +
- " with comparator = UTF8Type " +
- " and default_validation_class = UTF8Type " +
- " and key_validation_class = UTF8Type " +
- " and column_metadata = [" +
- "{column_name: col_ascii, validation_class: AsciiType}, " +
- "{column_name: col_long, validation_class: LongType}, " +
- "{column_name: col_bytes, validation_class: BytesType}, " +
- "{column_name: col_boolean, validation_class: BooleanType}, " +
- "{column_name: col_decimal, validation_class: DecimalType}, " +
- "{column_name: col_double, validation_class: DoubleType}, " +
- "{column_name: col_float, validation_class: FloatType}," +
- "{column_name: col_inetaddress, validation_class: InetAddressType}, " +
- "{column_name: col_int32, validation_class: Int32Type}, " +
- "{column_name: col_uft8, validation_class: UTF8Type}, " +
- "{column_name: col_date, validation_class: DateType}, " +
- "{column_name: col_uuid, validation_class: UUIDType}, " +
- "{column_name: col_integer, validation_class: IntegerType}, " +
- "{column_name: col_timeuuid, validation_class: TimeUUIDType}, " +
- "{column_name: col_lexical_uuid, validation_class: LexicalUUIDType}, " +
- "]; ",
-
- "set SomeApp['foo']['col_ascii'] = 'ascii';",
- "set SomeApp['foo']['col_boolean'] = false;",
- "set SomeApp['foo']['col_bytes'] = 'DEADBEEF';",
- "set SomeApp['foo']['col_date'] = '2011-02-03T04:05:00+0000';",
- "set SomeApp['foo']['col_decimal'] = '23.345';",
- "set SomeApp['foo']['col_double'] = '2.7182818284590451';",
- "set SomeApp['foo']['col_float'] = '23.45';",
- "set SomeApp['foo']['col_inetaddress'] = '127.0.0.1';",
- "set SomeApp['foo']['col_int32'] = 23;",
- "set SomeApp['foo']['col_integer'] = 12345;",
- "set SomeApp['foo']['col_long'] = 12345678;",
- "set SomeApp['foo']['col_lexical_uuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77';",
- "set SomeApp['foo']['col_timeuuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f';",
- "set SomeApp['foo']['col_uft8'] = 'hello';",
- "set SomeApp['foo']['col_uuid'] = '550e8400-e29b-41d4-a716-446655440000';",
-
- "create column family CC with " +
- "key_validation_class = UTF8Type and " +
- "default_validation_class=CounterColumnType " +
- "and comparator=UTF8Type;",
-
- "incr CC['chuck']['kick'];",
- "incr CC['chuck']['kick'];",
- "incr CC['chuck']['kick'];"
- };
-
- @BeforeClass
- public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
- AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
- {
- startCassandra();
- setupDataByCli(statements);
- startHadoopCluster();
- }
-
- @Test
- public void testCassandraStorageDataType() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
- {
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
-
- //{key: chararray, col_ascii: (name: chararray,value: chararray),
- //col_boolean: (name: chararray,value: bytearray),
- //col_bytes: (name: chararray,value: bytearray),
- //col_date: (name: chararray,value: long),
- //col_decimal: (name: chararray,value: chararray),
- //col_double: (name: chararray,value: double),
- //col_float: (name: chararray,value: float),
- //col_inetaddress: (name: chararray,value: chararray),
- //col_int32: (name: chararray,value: int),
- //col_integer: (name: chararray,value: int),
- //col_lexical_uuid: (name: chararray,value: chararray),
- //col_long: (name: chararray,value: long),
- //col_timeuuid: (name: chararray,value: bytearray),
- //col_uft8: (name: chararray,value: chararray),
- //col_uuid: (name: chararray,value: chararray),
- //columns: {(name: chararray,value: chararray)}}
- Iterator<Tuple> it = pig.openIterator("rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), "foo");
- Tuple column = (Tuple) t.get(1);
- Assert.assertEquals(column.get(1), "ascii");
- column = (Tuple) t.get(2);
- Assert.assertEquals(column.get(1), false);
- column = (Tuple) t.get(3);
- Assert.assertEquals(column.get(1), new DataByteArray(Hex.hexToBytes("DEADBEEF")));
- column = (Tuple) t.get(4);
- Assert.assertEquals(column.get(1), 1296705900000L);
- column = (Tuple) t.get(5);
- Assert.assertEquals(column.get(1), "23.345");
- column = (Tuple) t.get(6);
- Assert.assertEquals(column.get(1), 2.7182818284590451d);
- column = (Tuple) t.get(7);
- Assert.assertEquals(column.get(1), 23.45f);
- column = (Tuple) t.get(8);
- Assert.assertEquals(column.get(1), "127.0.0.1");
- column = (Tuple) t.get(9);
- Assert.assertEquals(column.get(1), 23);
- column = (Tuple) t.get(10);
- Assert.assertEquals(column.get(1), 12345);
- column = (Tuple) t.get(11);
- Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array())));
- column = (Tuple) t.get(12);
- Assert.assertEquals(column.get(1), 12345678L);
- column = (Tuple) t.get(13);
- Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array())));
- column = (Tuple) t.get(14);
- Assert.assertEquals(column.get(1), "hello");
- column = (Tuple) t.get(15);
- Assert.assertEquals(column.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array())));
- }
-
- pig.registerQuery("cc_rows = LOAD 'cassandra://thriftKs/CC?" + defaultParameters + "' USING CassandraStorage();");
-
- //(chuck,{(kick,3)})
- it = pig.openIterator("cc_rows");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), "chuck");
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if(iter.hasNext())
- {
- Tuple column = iter.next();
- Assert.assertEquals(column.get(0), "kick");
- Assert.assertEquals(column.get(1), 3L);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
deleted file mode 100644
index 223cbf4..0000000
--- a/test/unit/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ /dev/null
@@ -1,827 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.pig;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.Iterator;
-
-import org.apache.cassandra.cli.CliMain;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.thrift.AuthorizationException;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.cassandra.thrift.TimedOutException;
-import org.apache.cassandra.thrift.UnavailableException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ThriftColumnFamilyTest extends PigTestBase
-{
- private static String[] statements = {
- "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" +
- " strategy_options={replication_factor:1};",
- "use thriftKs;",
-
- "create column family SomeApp " +
- " with comparator = UTF8Type " +
- " and default_validation_class = UTF8Type " +
- " and key_validation_class = UTF8Type " +
- " and column_metadata = [{column_name: name, validation_class: UTF8Type, index_type: KEYS}, " +
- "{column_name: vote_type, validation_class: UTF8Type}, " +
- "{column_name: rating, validation_class: Int32Type}, " +
- "{column_name: score, validation_class: LongType}, " +
- "{column_name: percent, validation_class: FloatType}, " +
- "{column_name: atomic_weight, validation_class: DoubleType}, " +
- "{column_name: created, validation_class: DateType},]; ",
-
- "create column family CopyOfSomeApp " +
- "with key_validation_class = UTF8Type " +
- "and default_validation_class = UTF8Type " +
- "and comparator = UTF8Type " +
- "and column_metadata = " +
- "[ " +
- "{column_name: name, validation_class: UTF8Type, index_type: KEYS}, " +
- "{column_name: vote_type, validation_class: UTF8Type}, " +
- "{column_name: rating, validation_class: Int32Type}, " +
- "{column_name: score, validation_class: LongType}, " +
- "{column_name: percent, validation_class: FloatType}, " +
- "{column_name: atomic_weight, validation_class: DoubleType}, " +
- "{column_name: created, validation_class: DateType}, " +
- "];",
-
- "set SomeApp['foo']['name'] = 'User Foo';",
- "set SomeApp['foo']['vote_type'] = 'like';",
- "set SomeApp['foo']['rating'] = 8;",
- "set SomeApp['foo']['score'] = 125000;",
- "set SomeApp['foo']['percent'] = '85.0';",
- "set SomeApp['foo']['atomic_weight'] = '2.7182818284590451';",
- "set SomeApp['foo']['created'] = 1335890877;",
-
- "set SomeApp['bar']['name'] = 'User Bar';",
- "set SomeApp['bar']['vote_type'] = 'like';",
- "set SomeApp['bar']['rating'] = 9;",
- "set SomeApp['bar']['score'] = 15000;",
- "set SomeApp['bar']['percent'] = '35.0';",
- "set SomeApp['bar']['atomic_weight'] = '3.1415926535897931';",
- "set SomeApp['bar']['created'] = 1335890877;",
-
- "set SomeApp['baz']['name'] = 'User Baz';",
- "set SomeApp['baz']['vote_type'] = 'dislike';",
- "set SomeApp['baz']['rating'] = 3;",
- "set SomeApp['baz']['score'] = 512000;",
- "set SomeApp['baz']['percent'] = '95.3';",
- "set SomeApp['baz']['atomic_weight'] = '1.61803399';",
- "set SomeApp['baz']['created'] = 1335890877;",
- "set SomeApp['baz']['extra1'] = 'extra1';",
- "set SomeApp['baz']['extra2'] = 'extra2';",
- "set SomeApp['baz']['extra3'] = 'extra3';",
-
- "set SomeApp['qux']['name'] = 'User Qux';",
- "set SomeApp['qux']['vote_type'] = 'dislike';",
- "set SomeApp['qux']['rating'] = 2;",
- "set SomeApp['qux']['score'] = 12000;",
- "set SomeApp['qux']['percent'] = '64.7';",
- "set SomeApp['qux']['atomic_weight'] = '0.660161815846869';",
- "set SomeApp['qux']['created'] = 1335890877;",
- "set SomeApp['qux']['extra1'] = 'extra1';",
- "set SomeApp['qux']['extra2'] = 'extra2';",
- "set SomeApp['qux']['extra3'] = 'extra3';",
- "set SomeApp['qux']['extra4'] = 'extra4';",
- "set SomeApp['qux']['extra5'] = 'extra5';",
- "set SomeApp['qux']['extra6'] = 'extra6';",
- "set SomeApp['qux']['extra7'] = 'extra7';",
-
- "create column family U8 with " +
- "key_validation_class = UTF8Type and " +
- "comparator = UTF8Type;",
-
- "create column family Bytes with " +
- "key_validation_class = BytesType and " +
- "comparator = UTF8Type;",
-
- "set U8['foo']['x'] = ascii('Z');",
- "set Bytes[ascii('foo')]['x'] = ascii('Z');",
-
- "create column family CC with " +
- "key_validation_class = UTF8Type and " +
- "default_validation_class=CounterColumnType " +
- "and comparator=UTF8Type;",
-
- "incr CC['chuck']['kick'];",
- "incr CC['chuck']['kick'];",
- "incr CC['chuck']['kick'];",
- "incr CC['chuck']['fist'];",
-
- "create column family Compo " +
- "with key_validation_class = UTF8Type " +
- "and default_validation_class = UTF8Type " +
- "and comparator = 'CompositeType(UTF8Type,UTF8Type)';",
-
- "set Compo['punch']['bruce:lee'] = 'ouch';",
- "set Compo['punch']['bruce:bruce'] = 'hunh?';",
- "set Compo['kick']['bruce:lee'] = 'oww';",
- "set Compo['kick']['bruce:bruce'] = 'watch it, mate';",
-
- "create column family CompoInt " +
- "with key_validation_class = UTF8Type " +
- "and default_validation_class = UTF8Type " +
- "and comparator = 'CompositeType(LongType,LongType)';",
-
- "set CompoInt['clock']['1:0'] = 'z';",
- "set CompoInt['clock']['1:30'] = 'zzzz';",
- "set CompoInt['clock']['2:30'] = 'daddy?';",
- "set CompoInt['clock']['6:30'] = 'coffee...';",
-
- "create column family CompoIntCopy " +
- "with key_validation_class = UTF8Type " +
- "and default_validation_class = UTF8Type " +
- "and comparator = 'CompositeType(LongType,LongType)';",
-
- "create column family CompoKey " +
- "with key_validation_class = 'CompositeType(UTF8Type,LongType)' " +
- "and default_validation_class = UTF8Type " +
- "and comparator = LongType;",
-
- "set CompoKey['clock:10']['1'] = 'z';",
- "set CompoKey['clock:20']['1'] = 'zzzz';",
- "set CompoKey['clock:30']['2'] = 'daddy?';",
- "set CompoKey['clock:40']['6'] = 'coffee...';",
-
- "create column family CompoKeyCopy " +
- "with key_validation_class = 'CompositeType(UTF8Type,LongType)' " +
- "and default_validation_class = UTF8Type " +
- "and comparator = LongType;"
- };
-
- @BeforeClass
- public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException,
- AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException
- {
- startCassandra();
- setupDataByCli(statements);
- startHadoopCluster();
- }
-
- @Test
- public void testCqlStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- //regular thrift column families
- pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();");
-
- //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
- //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
- //(foo,2.718281828459045,1335890877,User Foo,85.0,8,125000,like)
- //(qux,0.660161815846869,1335890877,User Qux,64.7,2,12000,dislike)
-
- //{key: chararray,atomic_weight: double,created: long,name: chararray,percent: float,rating: int,score: long,vote_type: chararray}
- Iterator<Tuple> it = pig.openIterator("data");
- int count = 0;
- while (it.hasNext()) {
- count ++;
- Tuple t = it.next();
- if (count == 1)
- {
- Assert.assertEquals(t.get(0), "bar");
- Assert.assertEquals(t.get(1), 3.141592653589793d);
- Assert.assertEquals(t.get(3), "User Bar");
- Assert.assertEquals(t.get(4), 35.0f);
- Assert.assertEquals(t.get(5), 9);
- Assert.assertEquals(t.get(6), 15000L);
- Assert.assertEquals(t.get(7), "like");
- }
- else if (count == 2)
- {
- Assert.assertEquals(t.get(0), "baz");
- Assert.assertEquals(t.get(1), 1.61803399d);
- Assert.assertEquals(t.get(3), "User Baz");
- Assert.assertEquals(t.get(4), 95.3f);
- Assert.assertEquals(t.get(5), 3);
- Assert.assertEquals(t.get(6), 512000L);
- Assert.assertEquals(t.get(7), "dislike");
- }else if (count == 3)
- {
- Assert.assertEquals(t.get(0), "foo");
- Assert.assertEquals(t.get(1), 2.718281828459045d);
- Assert.assertEquals(t.get(3), "User Foo");
- Assert.assertEquals(t.get(4), 85.0f);
- Assert.assertEquals(t.get(5), 8);
- Assert.assertEquals(t.get(6), 125000L);
- Assert.assertEquals(t.get(7), "like");
- }
- else if (count == 4)
- {
- Assert.assertEquals(t.get(0), "qux");
- Assert.assertEquals(t.get(1), 0.660161815846869d);
- Assert.assertEquals(t.get(3), "User Qux");
- Assert.assertEquals(t.get(4), 64.7f);
- Assert.assertEquals(t.get(5), 2);
- Assert.assertEquals(t.get(6), 12000L);
- Assert.assertEquals(t.get(7), "dislike");
- }
- }
- Assert.assertEquals(count, 4);
-
- //Test counter colun family
- pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();");
-
- //(chuck,fist,1)
- //(chuck,kick,3)
-
- // {key: chararray,column1: chararray,value: long}
- it = pig.openIterator("cc_data");
- count = 0;
- while (it.hasNext()) {
- count ++;
- Tuple t = it.next();
- if (count == 1)
- {
- Assert.assertEquals(t.get(0), "chuck");
- Assert.assertEquals(t.get(1), "fist");
- Assert.assertEquals(t.get(2), 1L);
- }
- else if (count == 2)
- {
- Assert.assertEquals(t.get(0), "chuck");
- Assert.assertEquals(t.get(1), "kick");
- Assert.assertEquals(t.get(2), 3L);
- }
- }
- Assert.assertEquals(count, 2);
-
- //Test composite column family
- pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
-
- //(kick,bruce,bruce,watch it, mate)
- //(kick,bruce,lee,oww)
- //(punch,bruce,bruce,hunh?)
- //(punch,bruce,lee,ouch)
-
- //{key: chararray,column1: chararray,column2: chararray,value: chararray}
- it = pig.openIterator("compo_data");
- count = 0;
- while (it.hasNext()) {
- count ++;
- Tuple t = it.next();
- if (count == 1)
- {
- Assert.assertEquals(t.get(0), "kick");
- Assert.assertEquals(t.get(1), "bruce");
- Assert.assertEquals(t.get(2), "bruce");
- Assert.assertEquals(t.get(3), "watch it, mate");
- }
- else if (count == 2)
- {
- Assert.assertEquals(t.get(0), "kick");
- Assert.assertEquals(t.get(1), "bruce");
- Assert.assertEquals(t.get(2), "lee");
- Assert.assertEquals(t.get(3), "oww");
- }
- else if (count == 3)
- {
- Assert.assertEquals(t.get(0), "punch");
- Assert.assertEquals(t.get(1), "bruce");
- Assert.assertEquals(t.get(2), "bruce");
- Assert.assertEquals(t.get(3), "hunh?");
- }
- else if (count == 4)
- {
- Assert.assertEquals(t.get(0), "punch");
- Assert.assertEquals(t.get(1), "bruce");
- Assert.assertEquals(t.get(2), "lee");
- Assert.assertEquals(t.get(3), "ouch");
- }
- }
- Assert.assertEquals(count, 4);
- }
-
- @Test
- public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
- {
- //results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User Qux),(percent,64.7),
- //(rating,2),(score,12000),(vote_type,dislike),{(extra1,extra1),
- //(extra2,extra2),(extra3,extra3),
- //(extra4,extra4),(extra5,extra5),
- //(extra6,extra6),(extra7,extra7)})
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
-
- //schema: {key: chararray,atomic_weight: (name: chararray,value: double),created: (name: chararray,value: long),
- //name: (name: chararray,value: chararray),percent: (name: chararray,value: float),
- //rating: (name: chararray,value: int),score: (name: chararray,value: long),
- //vote_type: (name: chararray,value: chararray),columns: {(name: chararray,value: chararray)}}
- Iterator<Tuple> it = pig.openIterator("rows");
- int count = 0;
- if (it.hasNext()) {
- Tuple t = it.next();
- String rowKey = t.get(0).toString();
- if ("qux".equals(rowKey))
- {
- Tuple column = (Tuple) t.get(1);
- Assert.assertEquals(column.get(0), "atomic_weight");
- Assert.assertEquals(column.get(1), 0.660161815846869d);
- column = (Tuple) t.get(3);
- Assert.assertEquals(column.get(0), "name");
- Assert.assertEquals(column.get(1), "User Qux");
- column = (Tuple) t.get(4);
- Assert.assertEquals(column.get(0), "percent");
- Assert.assertEquals(column.get(1), 64.7f);
- column = (Tuple) t.get(5);
- Assert.assertEquals(column.get(0), "rating");
- Assert.assertEquals(column.get(1), 2);
- column = (Tuple) t.get(6);
- Assert.assertEquals(column.get(0), "score");
- Assert.assertEquals(column.get(1), 12000L);
- column = (Tuple) t.get(7);
- Assert.assertEquals(column.get(0), "vote_type");
- Assert.assertEquals(column.get(1), "dislike");
- DataBag columns = (DataBag) t.get(8);
- Iterator<Tuple> iter = columns.iterator();
- int i = 0;
- while(iter.hasNext())
- {
- i++;
- column = iter.next();
- Assert.assertEquals(column.get(0), "extra"+i);
- }
- Assert.assertEquals(7, columns.size());
- }
-
- }
- }
-
- @Test
- public void testCassandraStorageFullCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
- pig.setBatchOn();
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
- //full copy
- pig.registerQuery("STORE rows INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.executeBatch();
- Assert.assertEquals("User Qux", getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type"));
- Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
- Assert.assertEquals("64.7", getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType"));
- }
-
- @Test
- public void testCassandraStorageSigleTupleCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
- pig.setBatchOn();
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
- //sigle tuple
- pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;");
- pig.registerQuery("STORE onecol INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.executeBatch();
- String value = null;
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- Assert.assertEquals("64.7", getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType"));
- }
-
- @Test
- public void testCassandraStorageBagOnlyCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
- pig.setBatchOn();
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
- //bag only
- pig.registerQuery("other = FOREACH rows GENERATE key, columns;");
- pig.registerQuery("STORE other INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.executeBatch();
- String value = null;
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", "qux", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "percent", "qux", "FloatType");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- Assert.assertEquals("extra1", getColumnValue("thriftKs", "CopyOfSomeApp", "extra1", "qux", "UTF8Type"));
- }
-
- @Test
- public void testCassandraStorageFilter() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
- pig.setBatchOn();
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
-
- //filter
- pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5;");
- pig.registerQuery("STORE likes INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.executeBatch();
-
- Assert.assertEquals("like", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "bar", "UTF8Type"));
- Assert.assertEquals("like", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "foo", "UTF8Type"));
- String value = null;
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "baz", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
-
- createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
- pig.setBatchOn();
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;");
- pig.registerQuery("STORE dislikes_extras INTO 'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING CassandraStorage();");
- pig.registerQuery("visible = FILTER rows BY COUNT(columns) == 0;");
- pig.executeBatch();
- Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "baz", "UTF8Type"));
- Assert.assertEquals("dislike", getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
- value = null;
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "bar", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- try
- {
- value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", "foo", "UTF8Type");
- }
- catch (NotFoundException e)
- {
- Assert.assertTrue(true);
- }
- if (value != null)
- Assert.fail();
- }
-
- @Test
- public void testCassandraStorageJoin() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- //test key types with a join
- pig.registerQuery("U8 = load 'cassandra://thriftKs/U8?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("Bytes = load 'cassandra://thriftKs/Bytes?" + defaultParameters + "' using CassandraStorage();");
-
- //cast key to chararray
- pig.registerQuery("b = foreach Bytes generate (chararray)key, columns;");
-
- //key in Bytes is a bytearray, U8 chararray
- //(foo,{(x,Z)},foo,{(x,Z)})
- pig.registerQuery("a = join Bytes by key, U8 by key;");
- Iterator<Tuple> it = pig.openIterator("a");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), new DataByteArray("foo".getBytes()));
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), "x");
- Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
- String column = (String) t.get(2);
- Assert.assertEquals(column, "foo");
- columns = (DataBag) t.get(3);
- iter = columns.iterator();
- Tuple t2 = iter.next();
- Assert.assertEquals(t2.get(0), "x");
- Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
- }
- //key should now be cast into a chararray
- //(foo,{(x,Z)},foo,{(x,Z)})
- pig.registerQuery("c = join b by (chararray)key, U8 by (chararray)key;");
- it = pig.openIterator("c");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), "foo");
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), "x");
- Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
- String column = (String) t.get(2);
- Assert.assertEquals(column, "foo");
- columns = (DataBag) t.get(3);
- iter = columns.iterator();
- Tuple t2 = iter.next();
- Assert.assertEquals(t2.get(0), "x");
- Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
- }
- }
-
- @Test
- public void testCassandraStorageCounterCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();");
-
- //Test counter column family support
- pig.registerQuery("CC = load 'cassandra://thriftKs/CC?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("total_hits = foreach CC generate key, SUM(columns.value);");
- //(chuck,4)
- Iterator<Tuple> it = pig.openIterator("total_hits");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), "chuck");
- Assert.assertEquals(t.get(1), 4l);
- }
- }
-
- @Test
- public void testCassandraStorageCompositeColumnCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- //Test CompositeType
- pig.registerQuery("compo = load 'cassandra://thriftKs/Compo?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("compo = foreach compo generate key as method, flatten(columns);");
- pig.registerQuery("lee = filter compo by columns::name == ('bruce','lee');");
-
- //(kick,(bruce,lee),oww)
- //(punch,(bruce,lee),ouch)
- Iterator<Tuple> it = pig.openIterator("lee");
- int count = 0;
- while (it.hasNext()) {
- count ++;
- Tuple t = it.next();
- if (count == 1)
- Assert.assertEquals(t.get(0), "kick");
- else
- Assert.assertEquals(t.get(0), "punch");
- Tuple t1 = (Tuple) t.get(1);
- Assert.assertEquals(t1.get(0), "bruce");
- Assert.assertEquals(t1.get(1), "lee");
- if (count == 1)
- Assert.assertEquals(t.get(2), "oww");
- else
- Assert.assertEquals(t.get(2), "ouch");
- }
- Assert.assertEquals(count, 2);
- pig.registerQuery("night = load 'cassandra://thriftKs/CompoInt?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("night = foreach night generate flatten(columns);");
- pig.registerQuery("night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;");
-
- //What happens at the darkest hour?
- pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;");
-
- //(2.5,daddy?)
- it = pig.openIterator("darkest");
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), 2.5d);
- Assert.assertEquals(t.get(1), "daddy?");
- }
- pig.setBatchOn();
- pig.registerQuery("compo_int_rows = LOAD 'cassandra://thriftKs/CompoInt?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("STORE compo_int_rows INTO 'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using CassandraStorage();");
- pig.executeBatch();
- pig.registerQuery("compocopy_int_rows = LOAD 'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using CassandraStorage();");
- //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)})
- it = pig.openIterator("compocopy_int_rows");
- count = 0;
- if (it.hasNext()) {
- Tuple t = it.next();
- Assert.assertEquals(t.get(0), "clock");
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- while (iter.hasNext())
- {
- count ++;
- Tuple t1 = iter.next();
- Tuple inner = (Tuple) t1.get(0);
- if (count == 1)
- {
- Assert.assertEquals(inner.get(0), 1L);
- Assert.assertEquals(inner.get(1), 0L);
- Assert.assertEquals(t1.get(1), "z");
- }
- else if (count == 2)
- {
- Assert.assertEquals(inner.get(0), 1L);
- Assert.assertEquals(inner.get(1), 30L);
- Assert.assertEquals(t1.get(1), "zzzz");
- }
- else if (count == 3)
- {
- Assert.assertEquals(inner.get(0), 2L);
- Assert.assertEquals(inner.get(1), 30L);
- Assert.assertEquals(t1.get(1), "daddy?");
- }
- else if (count == 4)
- {
- Assert.assertEquals(inner.get(0), 6L);
- Assert.assertEquals(inner.get(1), 30L);
- Assert.assertEquals(t1.get(1), "coffee...");
- }
- }
- Assert.assertEquals(count, 4);
- }
- }
-
- @Test
- public void testCassandraStorageCompositeKeyCF() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException
- {
- //Test CompositeKey
- pig.registerQuery("compokeys = load 'cassandra://thriftKs/CompoKey?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;");
- //((clock,40),{(6,coffee...)})
- Iterator<Tuple> it = pig.openIterator("compokeys");
- if (it.hasNext()) {
- Tuple t = it.next();
- Tuple key = (Tuple) t.get(0);
- Assert.assertEquals(key.get(0), "clock");
- Assert.assertEquals(key.get(1), 40L);
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if (iter.hasNext())
- {
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), 6L);
- Assert.assertEquals(t1.get(1), "coffee...");
- }
- }
- pig.setBatchOn();
- pig.registerQuery("compo_key_rows = LOAD 'cassandra://thriftKs/CompoKey?" + defaultParameters + "' using CassandraStorage();");
- pig.registerQuery("STORE compo_key_rows INTO 'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using CassandraStorage();");
- pig.executeBatch();
- pig.registerQuery("compo_key_copy_rows = LOAD 'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using CassandraStorage();");
- //((clock,10),{(1,z)})
- //((clock,20),{(1,zzzz)})
- //((clock,30),{(2,daddy?)})
- //((clock,40),{(6,coffee...)})
- it = pig.openIterator("compo_key_copy_rows");
- int count = 0;
- while (it.hasNext()) {
- Tuple t = it.next();
- count ++;
- if (count == 1)
- {
- Tuple key = (Tuple) t.get(0);
- Assert.assertEquals(key.get(0), "clock");
- Assert.assertEquals(key.get(1), 10L);
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if (iter.hasNext())
- {
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), 1L);
- Assert.assertEquals(t1.get(1), "z");
- }
- }
- else if (count == 2)
- {
- Tuple key = (Tuple) t.get(0);
- Assert.assertEquals(key.get(0), "clock");
- Assert.assertEquals(key.get(1), 20L);
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if (iter.hasNext())
- {
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), 1L);
- Assert.assertEquals(t1.get(1), "zzzz");
- }
- }
- else if (count == 3)
- {
- Tuple key = (Tuple) t.get(0);
- Assert.assertEquals(key.get(0), "clock");
- Assert.assertEquals(key.get(1), 30L);
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if (iter.hasNext())
- {
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), 2L);
- Assert.assertEquals(t1.get(1), "daddy?");
- }
- }
- else if (count == 4)
- {
- Tuple key = (Tuple) t.get(0);
- Assert.assertEquals(key.get(0), "clock");
- Assert.assertEquals(key.get(1), 40L);
- DataBag columns = (DataBag) t.get(1);
- Iterator<Tuple> iter = columns.iterator();
- if (iter.hasNext())
- {
- Tuple t1 = iter.next();
- Assert.assertEquals(t1.get(0), 6L);
- Assert.assertEquals(t1.get(1), "coffee...");
- }
- }
- }
- Assert.assertEquals(count, 4);
- }
-
- private String getColumnValue(String ks, String cf, String colName, String key, String validator)
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, IOException
- {
- Cassandra.Client client = getClient();
- client.set_keyspace(ks);
-
- ByteBuffer key_user_id = ByteBufferUtil.bytes(key);
-
- long timestamp = System.currentTimeMillis();
- ColumnPath cp = new ColumnPath(cf);
- ColumnParent par = new ColumnParent(cf);
- cp.column = ByteBufferUtil.bytes(colName);
-
- // read
- ColumnOrSuperColumn got = client.get(key_user_id, cp, ConsistencyLevel.ONE);
- return parseType(validator).getString(got.getColumn().value);
- }
-
- private void createColumnFamily(String ks, String cf, String statement) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
- {
- CliMain.connect("127.0.0.1", 9170);
- try
- {
- CliMain.processStatement("use " + ks + ";");
- CliMain.processStatement("drop column family " + cf + ";");
- }
- catch (Exception e)
- {
- }
- CliMain.processStatement(statement);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/pig/test/MiniCluster.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/pig/test/MiniCluster.java b/test/unit/org/apache/pig/test/MiniCluster.java
deleted file mode 100644
index 3216392..0000000
--- a/test/unit/org/apache/pig/test/MiniCluster.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.MiniMRCluster;
-
-public class MiniCluster extends MiniGenericCluster {
- private MiniMRCluster m_mr = null;
- public MiniCluster() {
- super();
- }
-
- @Override
- protected void setupMiniDfsAndMrClusters() {
- try {
- System.setProperty("hadoop.log.dir", "build/test/logs");
- final int dataNodes = 4; // There will be 4 data nodes
- final int taskTrackers = 4; // There will be 4 task tracker nodes
-
- // Create the configuration hadoop-site.xml file
- File conf_dir = new File("build/classes/");
- conf_dir.mkdirs();
- File conf_file = new File(conf_dir, "hadoop-site.xml");
-
- conf_file.delete();
-
- // Builds and starts the mini dfs and mapreduce clusters
- Configuration config = new Configuration();
- m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
- m_fileSys = m_dfs.getFileSystem();
- m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
-
- // Write the necessary config info to hadoop-site.xml
- m_conf = m_mr.createJobConf();
- m_conf.setInt("mapred.submit.replication", 2);
- m_conf.set("dfs.datanode.address", "0.0.0.0:0");
- m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
- m_conf.set("mapred.map.max.attempts", "2");
- m_conf.set("mapred.reduce.max.attempts", "2");
- m_conf.set("pig.jobcontrol.sleep", "100");
- m_conf.writeXml(new FileOutputStream(conf_file));
-
- // Set the system properties needed by Pig
- System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
- System.setProperty("namenode", m_conf.get("fs.default.name"));
- System.setProperty("junit.hadoop.conf", conf_dir.getPath());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected void shutdownMiniMrClusters() {
- if (m_mr != null) { m_mr.shutdown(); }
- m_mr = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/unit/org/apache/pig/test/MiniGenericCluster.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/pig/test/MiniGenericCluster.java b/test/unit/org/apache/pig/test/MiniGenericCluster.java
deleted file mode 100644
index ac3f5bc..0000000
--- a/test/unit/org/apache/pig/test/MiniGenericCluster.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.*;
-import java.util.Properties;
-
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-
-/**
- * This class builds a single instance of itself with the Singleton
- * design pattern. While building the single instance, it sets up a
- * mini cluster that actually consists of a mini DFS cluster and a
- * mini MapReduce cluster on the local machine and also sets up the
- * environment for Pig to run on top of the mini cluster.
- *
- * This class is the base class for MiniCluster, which has slightly
- * difference among different versions of hadoop. MiniCluster implementation
- * is located in $PIG_HOME/shims.
- */
-abstract public class MiniGenericCluster {
- protected MiniDFSCluster m_dfs = null;
- protected FileSystem m_fileSys = null;
- protected Configuration m_conf = null;
-
- protected final static MiniCluster INSTANCE = new MiniCluster();
- protected static boolean isSetup = true;
-
- protected MiniGenericCluster() {
- setupMiniDfsAndMrClusters();
- }
-
- abstract protected void setupMiniDfsAndMrClusters();
-
- /**
- * Returns the single instance of class MiniClusterBuilder that
- * represents the resouces for a mini dfs cluster and a mini
- * mapreduce cluster.
- */
- public static MiniCluster buildCluster() {
- if(! isSetup){
- INSTANCE.setupMiniDfsAndMrClusters();
- isSetup = true;
- }
- return INSTANCE;
- }
-
- public void shutDown(){
- INSTANCE.shutdownMiniDfsAndMrClusters();
- }
-
- protected void finalize() {
- shutdownMiniDfsAndMrClusters();
- }
-
- protected void shutdownMiniDfsAndMrClusters() {
- isSetup = false;
- shutdownMiniDfsClusters();
- shutdownMiniMrClusters();
- }
-
- protected void shutdownMiniDfsClusters() {
- try {
- if (m_fileSys != null) { m_fileSys.close(); }
- } catch (IOException e) {
- e.printStackTrace();
- }
- if (m_dfs != null) { m_dfs.shutdown(); }
- m_fileSys = null;
- m_dfs = null;
- }
-
- abstract protected void shutdownMiniMrClusters();
-
- public Properties getProperties() {
- errorIfNotSetup();
- return ConfigurationUtil.toProperties(m_conf);
- }
-
- public Configuration getConfiguration() {
- return new Configuration(m_conf);
- }
-
- public void setProperty(String name, String value) {
- errorIfNotSetup();
- m_conf.set(name, value);
- }
-
- public FileSystem getFileSystem() {
- errorIfNotSetup();
- return m_fileSys;
- }
-
- /**
- * Throw RunTimeException if isSetup is false
- */
- private void errorIfNotSetup(){
- if(isSetup)
- return;
- String msg = "function called on MiniCluster that has been shutdown";
- throw new RuntimeException(msg);
- }
-}