You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2012/06/22 01:40:29 UTC
svn commit: r1352738 [2/2] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/cli/ src/java/org/apache/hcatalog/data/schema/
src/java/org/apache/hcatalog/data/transfer/
src/java/org/apache/hcatalog/data/transfer/impl/ src/java/org/apache/h...
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestReaderWriter.java Fri Jun 22 01:40:27 2012
@@ -52,134 +52,143 @@ import org.junit.Test;
public class TestReaderWriter {
- @Test
- public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException {
-
- HiveConf conf = new HiveConf(getClass());
- Driver driver = new Driver(conf);
- SessionState.start(new CliSessionState(conf));
- driver.run("drop table mytbl");
- driver.run("create table mytbl (a string, b int)");
- Iterator<Entry<String,String>> itr = conf.iterator();
- Map<String,String> map = new HashMap<String, String>();
- while(itr.hasNext()){
- Entry<String,String> kv = itr.next();
- map.put(kv.getKey(), kv.getValue());
- }
-
- WriterContext cntxt = runsInMaster(map);
-
- File writeCntxtFile = File.createTempFile("hcat-write", "temp");
- writeCntxtFile.deleteOnExit();
-
- // Serialize context.
- ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
- oos.writeObject(cntxt);
- oos.flush();
- oos.close();
-
- // Now, deserialize it.
- ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
- cntxt = (WriterContext) ois.readObject();
- ois.close();
-
- runsInSlave(cntxt);
- commit(map, true, cntxt);
-
- ReaderContext readCntxt = runsInMaster(map, false);
-
- File readCntxtFile = File.createTempFile("hcat-read", "temp");
- readCntxtFile.deleteOnExit();
- oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
- oos.writeObject(readCntxt);
- oos.flush();
- oos.close();
-
- ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
- readCntxt = (ReaderContext) ois.readObject();
- ois.close();
-
-
- for(InputSplit split : readCntxt.getSplits()){
- runsInSlave(split, readCntxt.getConf());
- }
- }
-
- private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
-
- WriteEntity.Builder builder = new WriteEntity.Builder();
- WriteEntity entity = builder.withTable("mytbl").build();
- HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
- WriterContext info = writer.prepareWrite();
- return info;
- }
-
- private ReaderContext runsInMaster(Map<String,String> config, boolean bogus) throws HCatException {
-
- ReadEntity.Builder builder = new ReadEntity.Builder();
- ReadEntity entity = builder.withTable("mytbl").build();
- HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
- ReaderContext cntxt = reader.prepareRead();
- return cntxt;
- }
-
- private void runsInSlave(InputSplit split, Configuration config) throws HCatException {
-
- HCatReader reader = DataTransferFactory.getHCatReader(split, config);
- Iterator<HCatRecord> itr = reader.read();
- int i = 1;
- while(itr.hasNext()){
- HCatRecord read = itr.next();
- HCatRecord written = getRecord(i++);
- // Argh, HCatRecord doesnt implement equals()
- Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0)));
- Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1)));
- Assert.assertEquals(2, read.size());
- }
- Assert.assertFalse(itr.hasNext());
- }
-
- private void runsInSlave(WriterContext context) throws HCatException {
-
- HCatWriter writer = DataTransferFactory.getHCatWriter(context);
- writer.write(new HCatRecordItr());
- }
-
- private void commit(Map<String, String> config, boolean status, WriterContext context) throws IOException {
-
- WriteEntity.Builder builder = new WriteEntity.Builder();
- WriteEntity entity = builder.withTable("mytbl").build();
- HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
- if(status){
- writer.commit(context);
- } else {
- writer.abort(context);
- }
- }
-
- private static HCatRecord getRecord(int i) {
- List<Object> list = new ArrayList<Object>(2);
- list.add("Row #: " + i);
- list.add(i);
- return new DefaultHCatRecord(list);
- }
-
- private static class HCatRecordItr implements Iterator<HCatRecord> {
-
- int i = 0;
- @Override
- public boolean hasNext() {
- return i++ < 100 ? true : false;
- }
-
- @Override
- public HCatRecord next() {
- return getRecord(i);
- }
-
- @Override
- public void remove() {
- throw new RuntimeException();
- }
- }
+ @Test
+ public void test() throws MetaException, CommandNeedRetryException,
+ IOException, ClassNotFoundException {
+
+ HiveConf conf = new HiveConf(getClass());
+ Driver driver = new Driver(conf);
+ SessionState.start(new CliSessionState(conf));
+ driver.run("drop table mytbl");
+ driver.run("create table mytbl (a string, b int)");
+ Iterator<Entry<String, String>> itr = conf.iterator();
+ Map<String, String> map = new HashMap<String, String>();
+ while (itr.hasNext()) {
+ Entry<String, String> kv = itr.next();
+ map.put(kv.getKey(), kv.getValue());
+ }
+
+ WriterContext cntxt = runsInMaster(map);
+
+ File writeCntxtFile = File.createTempFile("hcat-write", "temp");
+ writeCntxtFile.deleteOnExit();
+
+ // Serialize context.
+ ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(
+ writeCntxtFile));
+ oos.writeObject(cntxt);
+ oos.flush();
+ oos.close();
+
+ // Now, deserialize it.
+ ObjectInputStream ois = new ObjectInputStream(new FileInputStream(
+ writeCntxtFile));
+ cntxt = (WriterContext) ois.readObject();
+ ois.close();
+
+ runsInSlave(cntxt);
+ commit(map, true, cntxt);
+
+ ReaderContext readCntxt = runsInMaster(map, false);
+
+ File readCntxtFile = File.createTempFile("hcat-read", "temp");
+ readCntxtFile.deleteOnExit();
+ oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
+ oos.writeObject(readCntxt);
+ oos.flush();
+ oos.close();
+
+ ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
+ readCntxt = (ReaderContext) ois.readObject();
+ ois.close();
+
+ for (InputSplit split : readCntxt.getSplits()) {
+ runsInSlave(split, readCntxt.getConf());
+ }
+ }
+
+ private WriterContext runsInMaster(Map<String, String> config)
+ throws HCatException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ WriterContext info = writer.prepareWrite();
+ return info;
+ }
+
+ private ReaderContext runsInMaster(Map<String, String> config, boolean bogus)
+ throws HCatException {
+
+ ReadEntity.Builder builder = new ReadEntity.Builder();
+ ReadEntity entity = builder.withTable("mytbl").build();
+ HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
+ ReaderContext cntxt = reader.prepareRead();
+ return cntxt;
+ }
+
+ private void runsInSlave(InputSplit split, Configuration config)
+ throws HCatException {
+
+ HCatReader reader = DataTransferFactory.getHCatReader(split, config);
+ Iterator<HCatRecord> itr = reader.read();
+ int i = 1;
+ while (itr.hasNext()) {
+ HCatRecord read = itr.next();
+ HCatRecord written = getRecord(i++);
+ // Argh, HCatRecord doesnt implement equals()
+ Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
+ written.get(0).equals(read.get(0)));
+ Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
+ written.get(1).equals(read.get(1)));
+ Assert.assertEquals(2, read.size());
+ }
+ Assert.assertFalse(itr.hasNext());
+ }
+
+ private void runsInSlave(WriterContext context) throws HCatException {
+
+ HCatWriter writer = DataTransferFactory.getHCatWriter(context);
+ writer.write(new HCatRecordItr());
+ }
+
+ private void commit(Map<String, String> config, boolean status,
+ WriterContext context) throws IOException {
+
+ WriteEntity.Builder builder = new WriteEntity.Builder();
+ WriteEntity entity = builder.withTable("mytbl").build();
+ HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
+ if (status) {
+ writer.commit(context);
+ } else {
+ writer.abort(context);
+ }
+ }
+
+ private static HCatRecord getRecord(int i) {
+ List<Object> list = new ArrayList<Object>(2);
+ list.add("Row #: " + i);
+ list.add(i);
+ return new DefaultHCatRecord(list);
+ }
+
+ private static class HCatRecordItr implements Iterator<HCatRecord> {
+
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i++ < 100 ? true : false;
+ }
+
+ @Override
+ public HCatRecord next() {
+ return getRecord(i);
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException();
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java Fri Jun 22 01:40:27 2012
@@ -41,73 +41,82 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.common.HCatConstants;
-public class TestMsgBusConnection extends TestCase{
+public class TestMsgBusConnection extends TestCase {
- private Driver driver;
- private BrokerService broker;
- private MessageConsumer consumer;
-
- @Override
- protected void setUp() throws Exception {
-
- super.setUp();
- broker = new BrokerService();
- // configure the broker
- broker.addConnector("tcp://localhost:61616?broker.persistent=false");
-
- broker.start();
-
- System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
- connectClient();
- HiveConf hiveConf = new HiveConf(this.getClass());
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- }
-
- private void connectClient() throws JMSException{
- ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection conn = connFac.createConnection();
- conn.start();
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session.createTopic("planetlab.hcat");
- consumer = session.createConsumer(hcatTopic);
- }
-
- public void testConnection() throws Exception{
-
- try{
- driver.run("create database testconndb");
- Message msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
- assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
- broker.stop();
- driver.run("drop database testconndb cascade");
- broker.start(true);
- connectClient();
- driver.run("create database testconndb");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
- assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
- driver.run("drop database testconndb cascade");
- msg = consumer.receive();
- assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
- assertEquals("topic://planetlab.hcat",msg.getJMSDestination().toString());
- assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
- } catch (NoSuchObjectException nsoe){
- nsoe.printStackTrace(System.err);
- assert false;
- } catch (AlreadyExistsException aee){
- aee.printStackTrace(System.err);
- assert false;
- }
- }
+ private Driver driver;
+ private BrokerService broker;
+ private MessageConsumer consumer;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ broker = new BrokerService();
+ // configure the broker
+ broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+ broker.start();
+
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+ connectClient();
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ private void connectClient() throws JMSException {
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "tcp://localhost:61616");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic("planetlab.hcat");
+ consumer = session.createConsumer(hcatTopic);
+ }
+
+ public void testConnection() throws Exception {
+
+ try {
+ driver.run("create database testconndb");
+ Message msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ broker.stop();
+ driver.run("drop database testconndb cascade");
+ broker.start(true);
+ connectClient();
+ driver.run("create database testconndb");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ driver.run("drop database testconndb cascade");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } catch (NoSuchObjectException nsoe) {
+ nsoe.printStackTrace(System.err);
+ assert false;
+ } catch (AlreadyExistsException aee) {
+ aee.printStackTrace(System.err);
+ assert false;
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java Fri Jun 22 01:40:27 2012
@@ -58,126 +58,137 @@ import org.apache.thrift.TException;
import junit.framework.TestCase;
-public class TestNotificationListener extends TestCase implements MessageListener{
+public class TestNotificationListener extends TestCase implements
+ MessageListener {
- private HiveConf hiveConf;
- private Driver driver;
- private AtomicInteger cntInvocation = new AtomicInteger(0);
-
- @Override
- protected void setUp() throws Exception {
-
- super.setUp();
- System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
- System.setProperty("java.naming.provider.url", "vm://localhost?broker.persistent=false");
- ConnectionFactory connFac = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
- Connection conn = connFac.createConnection();
- conn.start();
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- Destination hcatTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
- MessageConsumer consumer1 = session.createConsumer(hcatTopic);
- consumer1.setMessageListener(this);
- Destination tblTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb.mytbl");
- MessageConsumer consumer2 = session.createConsumer(tblTopic);
- consumer2.setMessageListener(this);
- Destination dbTopic = session.createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".mydb");
- MessageConsumer consumer3 = session.createConsumer(dbTopic);
- consumer3.setMessageListener(this);
- hiveConf = new HiveConf(this.getClass());
- hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
- hiveConf.set("hive.metastore.local", "true");
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
- SessionState.start(new CliSessionState(hiveConf));
- driver = new Driver(hiveConf);
- }
-
- @Override
- protected void tearDown() throws Exception {
- assertEquals(7, cntInvocation.get());
- super.tearDown();
- }
-
- public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException,
- CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{
- driver.run("create database mydb");
- driver.run("use mydb");
- driver.run("create table mytbl (a string) partitioned by (b string)");
- driver.run("alter table mytbl add partition(b='2011')");
- HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
- Map<String,String> kvs = new HashMap<String, String>(1);
- kvs.put("b", "2011");
- msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE);
- driver.run("alter table mytbl drop partition(b='2011')");
- driver.run("drop table mytbl");
- driver.run("drop database mydb");
- }
-
- @Override
- public void onMessage(Message msg) {
- cntInvocation.incrementAndGet();
-
- String event;
- try {
- event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
- if(event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)){
-
- assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
- assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
- }
- else if(event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)){
-
- assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString());
- Table tbl = (Table)(((ObjectMessage)msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
- }
- else if(event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){
-
- assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
- Partition part = (Partition)(((ObjectMessage)msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
- List<String> vals = new ArrayList<String>(1);
- vals.add("2011");
- assertEquals(vals,part.getValues());
- }
- else if(event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)){
-
- assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
- Partition part = (Partition)(((ObjectMessage)msg).getObject());
- assertEquals("mytbl", part.getTableName());
- assertEquals("mydb", part.getDbName());
- List<String> vals = new ArrayList<String>(1);
- vals.add("2011");
- assertEquals(vals,part.getValues());
- }
- else if(event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)){
-
- assertEquals("topic://hcat.mydb",msg.getJMSDestination().toString());
- Table tbl = (Table)(((ObjectMessage)msg).getObject());
- assertEquals("mytbl", tbl.getTableName());
- assertEquals("mydb", tbl.getDbName());
- assertEquals(1, tbl.getPartitionKeysSize());
- }
- else if(event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)){
-
- assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
- assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
- }
- else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
- assertEquals("topic://hcat.mydb.mytbl",msg.getJMSDestination().toString());
- MapMessage mapMsg = (MapMessage)msg;
- assert mapMsg.getString("b").equals("2011");
- } else
- assert false;
- } catch (JMSException e) {
- e.printStackTrace(System.err);
- assert false;
- }
- }
+ private HiveConf hiveConf;
+ private Driver driver;
+ private AtomicInteger cntInvocation = new AtomicInteger(0);
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url",
+ "vm://localhost?broker.persistent=false");
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "vm://localhost?broker.persistent=false");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ MessageConsumer consumer1 = session.createConsumer(hcatTopic);
+ consumer1.setMessageListener(this);
+ Destination tblTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb.mytbl");
+ MessageConsumer consumer2 = session.createConsumer(tblTopic);
+ consumer2.setMessageListener(this);
+ Destination dbTopic = session
+ .createTopic(HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX + ".mydb");
+ MessageConsumer consumer3 = session.createConsumer(dbTopic);
+ consumer3.setMessageListener(this);
+ hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ assertEquals(7, cntInvocation.get());
+ super.tearDown();
+ }
+
+ public void testAMQListener() throws MetaException, TException,
+ UnknownTableException, NoSuchObjectException, CommandNeedRetryException,
+ UnknownDBException, InvalidPartitionException, UnknownPartitionException {
+ driver.run("create database mydb");
+ driver.run("use mydb");
+ driver.run("create table mytbl (a string) partitioned by (b string)");
+ driver.run("alter table mytbl add partition(b='2011')");
+ HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+ Map<String, String> kvs = new HashMap<String, String>(1);
+ kvs.put("b", "2011");
+ msc.markPartitionForEvent("mydb", "mytbl", kvs,
+ PartitionEventType.LOAD_DONE);
+ driver.run("alter table mytbl drop partition(b='2011')");
+ driver.run("drop table mytbl");
+ driver.run("drop database mydb");
+ }
+
+ @Override
+ public void onMessage(Message msg) {
+ cntInvocation.incrementAndGet();
+
+ String event;
+ try {
+ event = msg.getStringProperty(HCatConstants.HCAT_EVENT);
+ if (event.equals(HCatConstants.HCAT_ADD_DATABASE_EVENT)) {
+
+ assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_ADD_TABLE_EVENT)) {
+
+ assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)) {
+
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List<String> vals = new ArrayList<String>(1);
+ vals.add("2011");
+ assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_PARTITION_EVENT)) {
+
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ Partition part = (Partition) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", part.getTableName());
+ assertEquals("mydb", part.getDbName());
+ List<String> vals = new ArrayList<String>(1);
+ vals.add("2011");
+ assertEquals(vals, part.getValues());
+ } else if (event.equals(HCatConstants.HCAT_DROP_TABLE_EVENT)) {
+
+ assertEquals("topic://hcat.mydb", msg.getJMSDestination().toString());
+ Table tbl = (Table) (((ObjectMessage) msg).getObject());
+ assertEquals("mytbl", tbl.getTableName());
+ assertEquals("mydb", tbl.getDbName());
+ assertEquals(1, tbl.getPartitionKeysSize());
+ } else if (event.equals(HCatConstants.HCAT_DROP_DATABASE_EVENT)) {
+
+ assertEquals("topic://" + HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX, msg
+ .getJMSDestination().toString());
+ assertEquals("mydb",
+ ((Database) ((ObjectMessage) msg).getObject()).getName());
+ } else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+ assertEquals("topic://hcat.mydb.mytbl", msg.getJMSDestination()
+ .toString());
+ MapMessage mapMsg = (MapMessage) msg;
+ assert mapMsg.getString("b").equals("2011");
+ } else
+ assert false;
+ } catch (JMSException e) {
+ e.printStackTrace(System.err);
+ assert false;
+ }
+ }
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Fri Jun 22 01:40:27 2012
@@ -110,18 +110,18 @@ public class TestHCatLoaderComplexSchema
String pigSchema =
"(" +
"a: " +
- "(" +
- "aa: chararray, " +
- "ab: long, " +
- "ac: map[], " +
- "ad: { t: (ada: long) }, " +
- "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
- "af: (afa: chararray, afb: long) " +
- ")," +
- "b: chararray, " +
- "c: long, " +
- "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
- ")";
+ "(" +
+ "aa: chararray, " +
+ "ab: long, " +
+ "ac: map[], " +
+ "ad: { t: (ada: long) }, " +
+ "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+ "af: (afa: chararray, afb: long) " +
+ ")," +
+ "b: chararray, " +
+ "c: long, " +
+ "d: { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
+ ")";
// with extra structs
String tableSchema =
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1352738&r1=1352737&r2=1352738&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Jun 22 01:40:27 2012
@@ -132,7 +132,7 @@ public class TestHCatStorer extends Test
driver.run("drop table employee");
String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " +
- " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";
+ " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE";
int retCode = driver.run(createTable).getResponseCode();
if(retCode != 0) {
@@ -148,7 +148,7 @@ public class TestHCatStorer extends Test
PigServer pig = new PigServer(ExecType.LOCAL);
pig.setBatchOn();
pig.registerQuery("A = LOAD '"+INPUT_FILE_NAME+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," +
- "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
+ "emp_gender:chararray,emp_country:chararray,emp_state:chararray);");
pig.registerQuery("TN = FILTER A BY emp_state == 'TN';");
pig.registerQuery("KA = FILTER A BY emp_state == 'KA';");
pig.registerQuery("KL = FILTER A BY emp_state == 'KL';");
@@ -415,7 +415,7 @@ public class TestHCatStorer extends Test
public void testBagNStruct() throws IOException, CommandNeedRetryException{
driver.run("drop table junit_unparted");
String createTable = "create table junit_unparted(b string,a struct<a1:int>, arr_of_struct array<string>, " +
- "arr_of_struct2 array<struct<s1:string,s2:string>>, arr_of_struct3 array<struct<s3:string>>) stored as RCFILE";
+ "arr_of_struct2 array<struct<s1:string,s2:string>>, arr_of_struct3 array<struct<s3:string>>) stored as RCFILE";
int retCode = driver.run(createTable).getResponseCode();
if(retCode != 0) {
throw new IOException("Failed to create table.");
@@ -430,7 +430,7 @@ public class TestHCatStorer extends Test
server.setBatchOn();
server.registerQuery("A = load '"+INPUT_FILE_NAME+"' as (b:chararray, a:tuple(a1:int), arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)});");
server.registerQuery("store A into 'default.junit_unparted' using "+HCatStorer.class.getName()+"('','b:chararray, a:tuple(a1:int)," +
- " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
+ " arr_of_struct:bag{mytup:tuple(s1:chararray)}, arr_of_struct2:bag{mytup:tuple(s1:chararray,s2:chararray)}, arr_of_struct3:bag{t3:tuple(s3:chararray)}');");
server.executeBatch();
driver.run("select * from junit_unparted");