You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ma...@apache.org on 2011/06/20 21:36:52 UTC

svn commit: r1137792 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/listener/ src/java/org/apache/hcatalog/metadata/ src/test/org/apache/hcatalog/listener/

Author: macyang
Date: Mon Jun 20 21:36:52 2011
New Revision: 1137792

URL: http://svn.apache.org/viewvc?rev=1137792&view=rev
Log:
HCAT-46: Send a message on a message bus when a partition is marked done (hashutosh via macyang)

Removed:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/HPartitionDoneEvent.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/metadata/ormodel.jdo
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/build.xml
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Jun 20 21:36:52 2011
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
  
+    HCAT-46. Send a message on a message bus when a partition is marked done (hashutosh via macyang)
+
     HCAT-3. Send a message on a message bus when events occur in Metastore (hashutosh)
   
     HCAT-16. Add InputFormat/OutputFormat for handling exported tables/partitions.

Modified: incubator/hcatalog/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Mon Jun 20 21:36:52 2011
@@ -231,7 +231,6 @@
       <compilerarg line="${javac.args}"/>
       <classpath refid="classpath" />
     </javac>
-  <antcall target="model-enhance" />
   </target>
  
   <!-- Build the hcatalog client jar -->
@@ -259,32 +258,6 @@
   -->
   <target name="jar" depends="clientjar,server-extensions"/>
 
-
- <!--
-  ================================================================================
-  Datanucleus Section
-  ================================================================================
-  -->
-
- <target name="model-enhance">
-    <taskdef name="datanucleusenhancer"
-                classname="org.datanucleus.enhancer.tools.EnhancerTask">
-       <classpath refid="classpath"/>
-   </taskdef>
-
-    <datanucleusenhancer
-        dir="${basedir}" failonerror="true" verbose="true" fork="true">
-    <fileset dir="${src.dir}/org/apache/hcatalog/metadata/">
-	<include name="ormodel.jdo" />
-    </fileset>   
-    <classpath>
-          <path refid="classpath"/>
-          <pathelement path="${build.dir}/classes/"/>
-        </classpath>
-        <jvmarg line="-Dlog4j.configuration=${basedir}/../conf/hive-log4j.properties"/>
-    </datanucleusenhancer>
-  </target>
-  
   <!--
   ================================================================================
   Test Section

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Mon Jun 20 21:36:52 2011
@@ -77,6 +77,7 @@ public final class HCatConstants {
   public static final String HCAT_EVENT = "HCAT_EVENT";
   public static final String HCAT_ADD_PARTITION_EVENT = "HCAT_ADD_PARTITION";
   public static final String HCAT_DROP_PARTITION_EVENT = "HCAT_DROP_PARTITION";
+  public static final String HCAT_PARTITION_DONE_EVENT = "HCAT_PARTITION_DONE";
   public static final String HCAT_ADD_TABLE_EVENT = "HCAT_ADD_TABLE";
   public static final String HCAT_DROP_TABLE_EVENT = "HCAT_DROP_TABLE";
   public static final String HCAT_ADD_DATABASE_EVENT = "HCAT_ADD_DATABASE";

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Mon Jun 20 21:36:52 2011
@@ -21,6 +21,8 @@ package org.apache.hcatalog.listener;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -28,6 +30,8 @@ import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
@@ -54,6 +58,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hcatalog.common.HCatConstants;
 
 /**
@@ -69,8 +74,8 @@ import org.apache.hcatalog.common.HCatCo
 public class NotificationListener extends MetaStoreEventListener{
 
 	private static final Log LOG = LogFactory.getLog(NotificationListener.class);
-	private Session session;
-	private Connection conn;
+	protected Session session;
+	protected Connection conn;
 
 	/**
 	 * Create message bus connection and session in constructor.
@@ -202,7 +207,7 @@ public class NotificationListener extend
 	 * @param event is the value of HCAT_EVENT property in message. It can be 
 	 * used to select messages in client side. 
 	 */
-	private void send(Serializable msgBody, String topicName, String event){
+	protected void send(Object msgBody, String topicName, String event){
 
 		try{
 
@@ -235,7 +240,19 @@ public class NotificationListener extend
 				return;
 			}
 			MessageProducer producer = session.createProducer(topic);
-			ObjectMessage msg = session.createObjectMessage(msgBody);
+			Message msg;
+			if (msgBody instanceof Map){
+				MapMessage mapMsg = session.createMapMessage();
+				Map<String,String> incomingMap = (Map<String,String>)msgBody;
+				for (Entry<String,String> partCol : incomingMap.entrySet()){
+					mapMsg.setString(partCol.getKey(), partCol.getValue());
+				}
+				msg = mapMsg;
+			}
+			else {
+				msg = session.createObjectMessage((Serializable)msgBody);
+			}
+
 			msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
 			producer.send(msg);
 			// Message must be transacted before we return.
@@ -289,4 +306,11 @@ public class NotificationListener extend
 			LOG.info("Failed to close message bus connection.", ignore);
 		}
 	}
+
+	@Override
+	public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+			throws MetaException {
+		if(lpde.getStatus())
+			send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
+	}
 }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java?rev=1137792&r1=1137791&r2=1137792&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestNotificationListener.java Mon Jun 20 21:36:52 2011
@@ -19,13 +19,16 @@
 package org.apache.hcatalog.listener;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -36,11 +39,16 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -89,15 +97,20 @@ public class TestNotificationListener ex
 
 	@Override
 	protected void tearDown() throws Exception {
-		assertEquals(6, cntInvocation.get());
+		assertEquals(7, cntInvocation.get());
 		super.tearDown();
 	}
 
-	public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, CommandNeedRetryException{
+	public void testAMQListener() throws MetaException, TException, UnknownTableException, NoSuchObjectException, 
+	CommandNeedRetryException, UnknownDBException, InvalidPartitionException, UnknownPartitionException{
 		driver.run("create database mydb");
 		driver.run("use mydb");
 		driver.run("create table mytbl (a string) partitioned by (b string)");
 		driver.run("alter table mytbl add partition(b='2011')");
+		HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
+		Map<String,String> kvs = new HashMap<String, String>(1);
+		kvs.put("b", "2011");
+		msc.markPartitionForEvent("mydb", "mytbl", kvs, PartitionEventType.LOAD_DONE);
 		driver.run("alter table mytbl drop partition(b='2011')");
 		driver.run("drop table mytbl");
 		driver.run("drop database mydb");
@@ -156,7 +169,11 @@ public class TestNotificationListener ex
 				assertEquals("topic://"+HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX,msg.getJMSDestination().toString());
 				assertEquals("mydb", ((Database) ((ObjectMessage)msg).getObject()).getName());
 			}
-			else
+			else if (event.equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)) {
+				assertEquals("topic://HCAT.mydb.mytbl",msg.getJMSDestination().toString());
+				MapMessage mapMsg = (MapMessage)msg;
+				assert mapMsg.getString("b").equals("2011");
+			} else
 				assert false;
 		} catch (JMSException e) {
 			e.printStackTrace(System.err);