You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/27 08:40:37 UTC
svn commit: r1414006 - in /oozie/branches/hcat-intre: ./ core/
core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/jms/
core/src/main/java/org/apache/oozie/service/
core/src/test/java/org/apache/oozie/jms/
Author: kamrul
Date: Tue Nov 27 07:40:35 2012
New Revision: 1414006
URL: http://svn.apache.org/viewvc?rev=1414006&view=rev
Log:
OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
Modified:
oozie/branches/hcat-intre/core/pom.xml
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/pom.xml
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Tue Nov 27 07:40:35 2012
@@ -40,6 +40,16 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -67,6 +77,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hcatalog</groupId>
+ <artifactId>hcatalog-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence</artifactId>
<scope>compile</scope>
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Tue Nov 27 07:40:35 2012
@@ -233,6 +233,7 @@ public enum ErrorCode {
E1502(XLog.STD, "Partition cache lookup error"),
E1503(XLog.STD, "Error in Metadata URI [{0}]"),
E1504(XLog.STD, "Error in getting HCat Access [{0}]"),
+ E1505(XLog.STD, "Error with JMS Message, Details: [{0}]"),
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java?rev=1414006&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java Tue Nov 27 07:40:35 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Message;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.XLog;
+
+public class HCatMessageHandler implements MessageHandler {
+
+ private PartitionWrapper msgPartition;
+ private static XLog log;
+
+ HCatMessageHandler() {
+ log = XLog.getLog(getClass());
+ }
+
+ /**
+ * Process JMS message produced by HCat.
+ *
+ * @param msg : to be processed
+ * @throws MetadataServiceException
+ */
+ @Override
+ public void process(Message msg) throws MetadataServiceException {
+ try {
+ HCatEventMessage hcatMsg = MessagingUtils.getMessage(msg);
+ if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.ADD_PARTITION)) {
+ // Parse msg components
+ AddPartitionMessage partMsg = (AddPartitionMessage) hcatMsg;
+ String server = partMsg.getServer();
+ String db = partMsg.getDB();
+ String table = partMsg.getTable();
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ if (pdms != null) {
+ // message is batched. therefore iterate through partitions
+ List<Map<String, String>> partitions = partMsg.getPartitions();
+ for (int i = 0; i < partitions.size(); i++) {
+ msgPartition = new PartitionWrapper(server, db, table, partitions.get(i));
+ if (!pdms.partitionAvailable(msgPartition)) {
+ log.warn(
+ "Partition map not updated. Message might be incorrect or partition [{0}] might be non-relevant",
+ msgPartition.toString());
+ }
+ else {
+ log.debug("Partition [{0}] updated from missing -> available in partition map",
+ msgPartition.toString());
+ }
+ }
+ }
+ else {
+ log.error("Partition dependency map is NULL");
+ }
+ }
+ else if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.DROP_PARTITION)) {
+ log.info("Message is of type [{0}]", HCatEventMessage.EventType.DROP_PARTITION.toString());
+ }
+ else if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.DROP_TABLE)) {
+ log.info("Message is of type [{0}]", HCatEventMessage.EventType.DROP_TABLE.toString());
+ }
+ }
+ catch (IllegalArgumentException iae) {
+ throw new MetadataServiceException(ErrorCode.E1505, iae);
+ }
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java Tue Nov 27 07:40:35 2012
@@ -19,11 +19,14 @@ package org.apache.oozie.jms;
import javax.jms.Message;
+import org.apache.oozie.service.MetadataServiceException;
+
public interface MessageHandler {
/**
* Process a generic JMS message.
*
* @param msg : to be processed
+ * @throws MetadataServiceException
*/
- public void process(Message msg);
+ public void process(Message msg) throws MetadataServiceException;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java Tue Nov 27 07:40:35 2012
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.MetadataServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.XLog;
@@ -93,7 +94,12 @@ public class MessageReceiver implements
@Override
public synchronized void onMessage(Message msg) {
if (msgHandler != null) {
- msgHandler.process(msg);
+ try {
+ msgHandler.process(msg);
+ }
+ catch (MetadataServiceException e) {
+ LOG.warn("Unable to process message from bus ", e);
+ }
}
else {
LOG.info("Message handler none. Unprocessed messsage " + msg);
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Tue Nov 27 07:40:35 2012
@@ -161,7 +161,6 @@ public class PartitionDependencyManagerS
* @param actionId
* @throws MetadataServiceException
*/
- @SuppressWarnings("unused")
public void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
Map<String, PartitionsGroup> tablePartitionsMap;
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java?rev=1414006&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java Tue Nov 27 07:40:35 2012
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.jms;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.json.JSONAddPartitionMessage;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the message handling specific to HCat partition messages updating the
+ * missing dependency cache
+ */
+public class TestHCatMessageHandler extends XDataTestCase {
+
+ private Services services;
+ private String javaNamingProviderUrl;
+ private ConnectionFactory connFac;
+ private Connection conn;
+ private Session session;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_CLASSES,
+ StringUtils.join(",", Arrays.asList(JMSAccessorService.class.getName())));
+ System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ javaNamingProviderUrl = "vm://localhost?broker.persistent=false";
+ System.setProperty("java.naming.provider.url", javaNamingProviderUrl);
+ addServiceToRun(services.getConf(), PartitionDependencyManagerService.class.getName());
+ services.init();
+
+ connFac = new ActiveMQConnectionFactory(javaNamingProviderUrl);
+ conn = connFac.createConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test HCat message is received by the HCatMessageHandler implementation
+ * Can be used with actual ActiveMQ installation
+ */
+ @Test
+ public void testProcessHCatMessage() {
+ String topicName = "hcat.default.mytbl"; // Assuming you gave TABLE as
+ // 'mytbl'
+ try {
+ MessageReceiver recvr = new MessageReceiver(new HCatMessageHandler());
+ recvr.registerTopic(topicName);
+ Thread.sleep(5000);
+ recvr.unRegisterTopic(topicName);
+ }
+ catch (Exception e) {
+ fail("Exception caused " + e.getMessage());
+ }
+ }
+
+ /**
+ * Generic test from ActiveMQ messaging system to see message is received
+ * and has the expected type
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessage() throws Exception {
+ String topicName = "hcat.default.mytbl"; // Assuming you gave TABLE as
+ // 'mytbl'
+ MessageReceiver recvr = new MessageReceiver(new MessageHandler() {
+ public void process(Message msg) {
+ assertTrue(msg instanceof ActiveMQTextMessage);
+ }
+ });
+ recvr.registerTopic(topicName);
+ Thread.sleep(5000);
+ recvr.unRegisterTopic(topicName);
+ }
+
+ /**
+ * Test that message is processed to update the dependency map and mark
+ * partition as available
+ */
+ @Test
+ public void testCacheUpdateByMessage() {
+
+ try {
+ // Define partition dependency
+ String stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?datastamp=12®ion=us";
+ HCatURI dep = new HCatURI(stringDep);
+ List<Map<String, String>> partitions = new ArrayList<Map<String, String>>(1);
+ partitions.add(dep.getPartitionMap());
+
+ // add dummy partition as missing
+ PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+ String actionId = "action1";
+ pdms.addMissingPartition(new PartitionWrapper(dep), actionId);
+
+ // construct message
+ JSONAddPartitionMessage jsonMsg = new JSONAddPartitionMessage(dep.getServer(), "", dep.getDb(),
+ dep.getTable(), partitions, System.currentTimeMillis());
+ Message msg = session.createTextMessage(jsonMsg.toString());
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.ADD_PARTITION.toString());
+
+ // process message
+ HCatMessageHandler hcatHandler = new HCatMessageHandler();
+ hcatHandler.process(msg);
+
+ // check updated map
+ Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+ assertNotNull(availMap);
+
+ //positive test
+ assertTrue(availMap.containsKey(actionId)); //found in 'available' cache
+ assertFalse(pdms.getHCatMap().containsKey(dep.getTable())); // removed from 'missing' cache cascade ON
+ assertEquals(availMap.get(actionId).get(0), new PartitionWrapper(dep));
+
+ // bunch of other partitions
+ stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?user=joe";
+ dep = new HCatURI(stringDep);
+ pdms.addMissingPartition(new PartitionWrapper(dep), actionId);
+ stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?part=fake";
+ dep = new HCatURI(stringDep);
+ partitions = new ArrayList<Map<String, String>>(1);
+ partitions.add(dep.getPartitionMap());
+
+ // negative test - message for partition that does not exist in
+ // partition dependency cache
+ jsonMsg = new JSONAddPartitionMessage(dep.getServer(), "", dep.getDb(), dep.getTable(), partitions,
+ System.currentTimeMillis());
+ msg = session.createTextMessage(jsonMsg.toString());
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.ADD_PARTITION.toString());
+
+ hcatHandler.process(msg);
+
+ PartitionsGroup pg = pdms.getHCatMap().get(PartitionWrapper.makePrefix(dep.getServer(), dep.getDb()))
+ .get(dep.getTable());
+ assertFalse(pg.getPartitionsMap().containsKey(new PartitionWrapper(dep)));
+
+ }
+ catch (Exception e) {
+ fail("Exception caused " + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Test the other type of messages - DROP_PARTITION and DROP_TABLE are
+ * handled with the correct log messages
+ */
+ public void testDropEventTypeMessage() {
+ try{
+ // Set the log4j appender for getting the statements logged by
+ // HCatMessageHandler
+ Logger logger = Logger.getLogger(HCatMessageHandler.class);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Layout layout = new SimpleLayout();
+ Appender appender = new WriterAppender(layout, out);
+ logger.addAppender(appender);
+
+ Message msg = session.createTextMessage("{" +
+ "\"server\" : \"thrift://localhost:1234\"," +
+ "\"db\" : \"default\"," +
+ "\"table\" : \"newTable\"," +
+ "\"timestamp\" : \"123456\"," +
+ "\"partitions\" : [{ \"dt\" : \"2012_01_01\", \"grid\" : \"AB\" }]" +
+ "}");
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.DROP_PARTITION.toString());
+
+ HCatMessageHandler hcatHandler = new HCatMessageHandler();
+ hcatHandler.process(msg);
+ //check logs to see appropriate error message
+ String logMsg = out.toString();
+ assertTrue(logMsg.contains(HCatEventMessage.EventType.DROP_PARTITION.toString()));
+
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.DROP_TABLE.toString());
+ hcatHandler.process(msg);
+ //check logs to see appropriate error message
+ logMsg = out.toString();
+ assertTrue(logMsg.contains(HCatEventMessage.EventType.DROP_TABLE.toString()));
+ }
+ catch (Exception e) {
+ fail("Exception caused " + e.getMessage());
+ }
+ }
+}
Modified: oozie/branches/hcat-intre/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Tue Nov 27 07:40:35 2012
@@ -81,6 +81,7 @@
<hive.version>0.9.0</hive.version>
<pig.version>0.9.0</pig.version>
<webhcat.version>0.5.0-SNAPSHOT</webhcat.version>
+ <hcatalog.version>0.5.0-SNAPSHOT</hcatalog.version>
<sqoop.version>1.5.0-incubating-SNAPSHOT</sqoop.version>
<streaming.version>${hadoop.version}</streaming.version>
<distcp.version>${hadooplib.version}</distcp.version>
@@ -278,6 +279,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.hcatalog</groupId>
+ <artifactId>hcatalog-core</artifactId>
+ <version>${hcatalog.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-persistence</artifactId>
<version>2.1.0</version>
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 27 07:40:35 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)
OOZIE-1068 Metadata Accessor service for HCatalog(mohammad)
OOZIE-1069 Update dataIn and dataOut EL functions to support partitions (mohammad)
OOZIE-1043 Add logic to register to Missing Dependency Structure in coord action materialization (ryota via mohammad)