You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ma...@apache.org on 2011/06/20 21:36:52 UTC
svn commit: r1137792 - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/listener/
src/java/org/apache/hcatalog/metadata/ src/test/org/apache/hcatalog/listener/
Author: macyang
Date: Mon Jun 20 21:36:52 2011
New Revision: 1137792
URL: http://svn.apache.org/viewvc?rev=1137792&view=rev
Log:
HCAT-46: Send a message on a message bus when a partition is marked done (hashutosh via macyang)
Removed:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/build.xml
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Jun 20 21:36:52 2011
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
NEW FEATURES
+ HCAT-46. Send a message on a message bus when a partition is marked done (hashutosh via macyang)
+
HCAT-3. Send a message on a message bus when events occur in Metastore (hashutosh)
HCAT-16. Add InputFormat/OutputFormat for handling exported tables/partitions.
Modified: incubator/hcatalog/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Mon Jun 20 21:36:52 2011
@@ -231,7 +231,6 @@
<compilerarg line="${javac.args}"/>
<classpath refid="classpath" />
</javac>
- <antcall target="model-enhance" />
</target>
<!-- Build the hcatalog client jar -->
@@ -259,32 +258,6 @@
-->
<target name="jar" depends="clientjar,server-extensions"/>
-
- <!--
- ================================================================================
- Datanucleus Section
- ================================================================================
- -->
-
- <target name="model-enhance">
- <taskdef name="datanucleusenhancer"
- classname="org.datanucleus.enhancer.tools.EnhancerTask">
- <classpath refid="classpath"/>
- </taskdef>
-
- <datanucleusenhancer
- dir="${basedir}" failonerror="true" verbose="true" fork="true">
- <fileset dir="${src.dir}/org/apache/hcatalog/metadata/">
- <include name="ormodel.jdo" />
- </fileset>
- <classpath>
- <path refid="classpath"/>
- <pathelement path="${build.dir}/classes/"/>
- </classpath>
- <jvmarg line="-Dlog4j.configuration=${basedir}/../conf/hive-log4j.properties"/>
- </datanucleusenhancer>
- </target>
-
<!--
================================================================================
Test Section
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Mon Jun 20 21:36:52 2011
@@ -77,6 +77,7 @@ public final class HCatConstants {
public static final String HCAT_EVENT = "HCAT_EVENT";
public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+ public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE";
public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Mon Jun 20 21:36:52 2011
@@ -21,6 +21,8 @@ package org.apache.hcatalog.listener;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -28,6 +30,8 @@ import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
@@ -54,6 +58,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hcatalog.common.HCatConstants;
/**
@@ -69,8 +74,8 @@ import org.apache.hcatalog.common.HCatCo
public class NotificationListener extends MetaStoreEventListener{
private static final Log LOG = LogFactory.getLog(NotificationListener.class);
- private Session session;
- private Connection conn;
+ protected Session session;
+ protected Connection conn;
/**
* Create message bus connection and session in constructor.
@@ -202,7 +207,7 @@ public class NotificationListener extend
* @param event is the value of HCAT_EVENT property in message. It can be
* used to select messages in client side.
*/
- private void send(Serializable msgBody, String topicName, String event){
+ protected void send(Object msgBody, String topicName, String event){
try{
@@ -235,7 +240,19 @@ public class NotificationListener extend
return;
}
MessageProducer producer = session.createProducer(topic);
- ObjectMessage msg = session.createObjectMessage(msgBody);
+ Message msg;
+ if (msgBody instanceof Map){
+ MapMessage mapMsg = session.createMapMessage();
+ Map<String,String> incomingMap = (Map<String,String>)msgBody;
+ for (Entry<String,String> partCol : incomingMap.entrySet()){
+ mapMsg.setString(partCol.getKey(), partCol.getValue());
+ }
+ msg = mapMsg;
+ }
+ else {
+ msg = session.createObjectMessage((Serializable)msgBody);
+ }
+
msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
producer.send(msg);
// Message must be transacted before we return.
@@ -289,4 +306,11 @@ public class NotificationListener extend
LOG.info("Failed to close message bus connection.", ignore);
}
}
+
+ @Override
+ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+ throws MetaException {
+ if(lpde.getStatus())
+ send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
+ }
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java Mon Jun 20 21:36:52 2011
@@ -19,13 +19,16 @@
package org.apache.hcatalog.listener;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -36,11 +39,16 @@ import org.apache.activemq.ActiveMQConne
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
@@ -89,15 +97,20 @@ public class TestNotificationListener ex
@Override
protected void tearDown() throws Exception {
- assertEquals(6, cntInvocation.get());
+ assertEquals(7, cntInvocation.get());
super.tearDown();
}
- public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+ public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException,
+ CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{
driver.run("create database mydb");
driver.run("use mydb");
driver.run("create table mytbl (a string) partitioned by (b string)");
driver.run("alter table mytbl add partition(b='2011')");
+ HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+ Map<String,String> kvs = new HashMap<String, String>(1);
+ kvs.put("b", "2011");
+ msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE);
driver.run("alter table mytbl drop partition(b='2011')");
driver.run("drop table mytbl");
driver.run("drop database mydb");
@@ -156,7 +169,11 @@ public class TestNotificationListener ex
assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
}
- else
+ else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+ assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+ MapMessage mapMsg = (MapMessage)msg;
+ assert mapMsg.getString("b").equals("2011");
+ } else
assert false;
} catch (JMSException e) {
e.printStackTrace(System.err);