You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/27 08:40:37 UTC

svn commit: r1414006 - in /oozie/branches/hcat-intre: ./ core/ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/jms/ core/src/main/java/org/apache/oozie/service/ core/src/test/java/org/apache/oozie/jms/

Author: kamrul
Date: Tue Nov 27 07:40:35 2012
New Revision: 1414006

URL: http://svn.apache.org/viewvc?rev=1414006&view=rev
Log:
OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
Modified:
    oozie/branches/hcat-intre/core/pom.xml
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    oozie/branches/hcat-intre/pom.xml
    oozie/branches/hcat-intre/release-log.txt

Modified: oozie/branches/hcat-intre/core/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Tue Nov 27 07:40:35 2012
@@ -40,6 +40,16 @@
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase</artifactId>
             <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -67,6 +77,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.hcatalog</groupId>
+            <artifactId>hcatalog-core</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.openjpa</groupId>
             <artifactId>openjpa-persistence</artifactId>
             <scope>compile</scope>

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Tue Nov 27 07:40:35 2012
@@ -233,6 +233,7 @@ public enum ErrorCode {
     E1502(XLog.STD, "Partition cache lookup error"),
     E1503(XLog.STD, "Error in Metadata URI [{0}]"),
     E1504(XLog.STD, "Error in getting HCat Access [{0}]"),
+    E1505(XLog.STD, "Error with JMS Message, Details: [{0}]"),
 
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
 

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java?rev=1414006&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/HCatMessageHandler.java Tue Nov 27 07:40:35 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Message;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.XLog;
+
+public class HCatMessageHandler implements MessageHandler {
+
+    private PartitionWrapper msgPartition;
+    private static XLog log;
+
+    HCatMessageHandler() {
+        log = XLog.getLog(getClass());
+    }
+
+    /**
+     * Process JMS message produced by HCat.
+     *
+     * @param msg : to be processed
+     * @throws MetadataServiceException
+     */
+    @Override
+    public void process(Message msg) throws MetadataServiceException {
+        try {
+            HCatEventMessage hcatMsg = MessagingUtils.getMessage(msg);
+            if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.ADD_PARTITION)) {
+                // Parse msg components
+                AddPartitionMessage partMsg = (AddPartitionMessage) hcatMsg;
+                String server = partMsg.getServer();
+                String db = partMsg.getDB();
+                String table = partMsg.getTable();
+                PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+                if (pdms != null) {
+                    // message is batched. therefore iterate through partitions
+                    List<Map<String, String>> partitions = partMsg.getPartitions();
+                    for (int i = 0; i < partitions.size(); i++) {
+                        msgPartition = new PartitionWrapper(server, db, table, partitions.get(i));
+                        if (!pdms.partitionAvailable(msgPartition)) {
+                            log.warn(
+                                    "Partition map not updated. Message might be incorrect or partition [{0}] might be non-relevant",
+                                    msgPartition.toString());
+                        }
+                        else {
+                            log.debug("Partition [{0}] updated from missing -> available in partition map",
+                                    msgPartition.toString());
+                        }
+                    }
+                }
+                else {
+                    log.error("Partition dependency map is NULL");
+                }
+            }
+            else if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.DROP_PARTITION)) {
+                log.info("Message is of type [{0}]", HCatEventMessage.EventType.DROP_PARTITION.toString());
+            }
+            else if (hcatMsg.getEventType().equals(HCatEventMessage.EventType.DROP_TABLE)) {
+                log.info("Message is of type [{0}]", HCatEventMessage.EventType.DROP_TABLE.toString());
+            }
+        }
+        catch (IllegalArgumentException iae) {
+            throw new MetadataServiceException(ErrorCode.E1505, iae);
+        }
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java Tue Nov 27 07:40:35 2012
@@ -19,11 +19,14 @@ package org.apache.oozie.jms;
 
 import javax.jms.Message;
 
+import org.apache.oozie.service.MetadataServiceException;
+
 public interface MessageHandler {
     /**
      * Process a generic JMS message.
      *
      * @param msg : to be processed
+     * @throws MetadataServiceException
      */
-    public void process(Message msg);
+    public void process(Message msg) throws MetadataServiceException;
 }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java Tue Nov 27 07:40:35 2012
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 
 import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.MetadataServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
 
@@ -93,7 +94,12 @@ public class MessageReceiver implements 
     @Override
     public synchronized void onMessage(Message msg) {
         if (msgHandler != null) {
-            msgHandler.process(msg);
+            try {
+                msgHandler.process(msg);
+            }
+            catch (MetadataServiceException e) {
+                LOG.warn("Unable to process message from bus ", e);
+            }
         }
         else {
             LOG.info("Message handler none. Unprocessed messsage " + msg);

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Tue Nov 27 07:40:35 2012
@@ -161,7 +161,6 @@ public class PartitionDependencyManagerS
      * @param actionId
      * @throws MetadataServiceException
      */
-    @SuppressWarnings("unused")
     public void addMissingPartition(PartitionWrapper partition, String actionId) throws MetadataServiceException {
         String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
         Map<String, PartitionsGroup> tablePartitionsMap;

Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java?rev=1414006&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestHCatMessageHandler.java Tue Nov 27 07:40:35 2012
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.jms;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.json.JSONAddPartitionMessage;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the message handling specific to HCat partition messages updating the
+ * missing dependency cache
+ */
+public class TestHCatMessageHandler extends XDataTestCase {
+
+    private Services services;
+    private String javaNamingProviderUrl;
+    private ConnectionFactory connFac;
+    private Connection conn;
+    private Session session;
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_CLASSES,
+                StringUtils.join(",", Arrays.asList(JMSAccessorService.class.getName())));
+        System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+        javaNamingProviderUrl = "vm://localhost?broker.persistent=false";
+        System.setProperty("java.naming.provider.url", javaNamingProviderUrl);
+        addServiceToRun(services.getConf(), PartitionDependencyManagerService.class.getName());
+        services.init();
+
+        connFac = new ActiveMQConnectionFactory(javaNamingProviderUrl);
+        conn = connFac.createConnection();
+        session = conn.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test HCat message is received by the HCatMessageHandler implementation
+     * Can be used with actual ActiveMQ installation
+     */
+    @Test
+    public void testProcessHCatMessage() {
+        String topicName = "hcat.default.mytbl"; // Assuming you gave TABLE as
+                                                 // 'mytbl'
+        try {
+            MessageReceiver recvr = new MessageReceiver(new HCatMessageHandler());
+            recvr.registerTopic(topicName);
+            Thread.sleep(5000);
+            recvr.unRegisterTopic(topicName);
+        }
+        catch (Exception e) {
+            fail("Exception caused " + e.getMessage());
+        }
+    }
+
+    /**
+     * Generic test from ActiveMQ messaging system to see message is received
+     * and has the expected type
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMessage() throws Exception {
+        String topicName = "hcat.default.mytbl"; // Assuming you gave TABLE as
+                                                 // 'mytbl'
+        MessageReceiver recvr = new MessageReceiver(new MessageHandler() {
+            public void process(Message msg) {
+                assertTrue(msg instanceof ActiveMQTextMessage);
+            }
+        });
+        recvr.registerTopic(topicName);
+        Thread.sleep(5000);
+        recvr.unRegisterTopic(topicName);
+    }
+
+    /**
+     * Test that message is processed to update the dependency map and mark
+     * partition as available
+     */
+    @Test
+    public void testCacheUpdateByMessage() {
+
+        try {
+            // Define partition dependency
+            String stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?datastamp=12&region=us";
+            HCatURI dep = new HCatURI(stringDep);
+            List<Map<String, String>> partitions = new ArrayList<Map<String, String>>(1);
+            partitions.add(dep.getPartitionMap());
+
+            // add dummy partition as missing
+            PartitionDependencyManagerService pdms = services.get(PartitionDependencyManagerService.class);
+            String actionId = "action1";
+            pdms.addMissingPartition(new PartitionWrapper(dep), actionId);
+
+            // construct message
+            JSONAddPartitionMessage jsonMsg = new JSONAddPartitionMessage(dep.getServer(), "", dep.getDb(),
+                    dep.getTable(), partitions, System.currentTimeMillis());
+            Message msg = session.createTextMessage(jsonMsg.toString());
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.ADD_PARTITION.toString());
+
+            // process message
+            HCatMessageHandler hcatHandler = new HCatMessageHandler();
+            hcatHandler.process(msg);
+
+            // check updated map
+            Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+            assertNotNull(availMap);
+
+            //positive test
+            assertTrue(availMap.containsKey(actionId)); //found in 'available' cache
+            assertFalse(pdms.getHCatMap().containsKey(dep.getTable())); // removed from 'missing' cache cascade ON
+            assertEquals(availMap.get(actionId).get(0), new PartitionWrapper(dep));
+
+            // bunch of other partitions
+            stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?user=joe";
+            dep = new HCatURI(stringDep);
+            pdms.addMissingPartition(new PartitionWrapper(dep), actionId);
+            stringDep = "hcat://hcat.yahoo.com:5080/mydb/mytbl/?part=fake";
+            dep = new HCatURI(stringDep);
+            partitions = new ArrayList<Map<String, String>>(1);
+            partitions.add(dep.getPartitionMap());
+
+            // negative test - message for partition that does not exist in
+            // partition dependency cache
+            jsonMsg = new JSONAddPartitionMessage(dep.getServer(), "", dep.getDb(), dep.getTable(), partitions,
+                    System.currentTimeMillis());
+            msg = session.createTextMessage(jsonMsg.toString());
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.ADD_PARTITION.toString());
+
+            hcatHandler.process(msg);
+
+            PartitionsGroup pg = pdms.getHCatMap().get(PartitionWrapper.makePrefix(dep.getServer(), dep.getDb()))
+                    .get(dep.getTable());
+            assertFalse(pg.getPartitionsMap().containsKey(new PartitionWrapper(dep)));
+
+        }
+        catch (Exception e) {
+            fail("Exception caused " + e.getMessage());
+        }
+
+    }
+
+    /**
+     * Test the other type of messages - DROP_PARTITION and DROP_TABLE are
+     * handled with the correct log messages
+     */
+    public void testDropEventTypeMessage() {
+        try{
+            // Set the log4j appender for getting the statements logged by
+            // HCatMessageHandler
+            Logger logger = Logger.getLogger(HCatMessageHandler.class);
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            Layout layout = new SimpleLayout();
+            Appender appender = new WriterAppender(layout, out);
+            logger.addAppender(appender);
+
+            Message msg = session.createTextMessage("{" +
+                "\"server\" : \"thrift://localhost:1234\"," +
+                "\"db\" : \"default\"," +
+                "\"table\" : \"newTable\"," +
+                "\"timestamp\" : \"123456\"," +
+                "\"partitions\" : [{ \"dt\" : \"2012_01_01\", \"grid\" : \"AB\" }]" +
+                "}");
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.DROP_PARTITION.toString());
+
+            HCatMessageHandler hcatHandler = new HCatMessageHandler();
+            hcatHandler.process(msg);
+            //check logs to see appropriate error message
+            String logMsg = out.toString();
+            assertTrue(logMsg.contains(HCatEventMessage.EventType.DROP_PARTITION.toString()));
+
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, HCatEventMessage.EventType.DROP_TABLE.toString());
+            hcatHandler.process(msg);
+            //check logs to see appropriate error message
+            logMsg = out.toString();
+            assertTrue(logMsg.contains(HCatEventMessage.EventType.DROP_TABLE.toString()));
+        }
+        catch (Exception e) {
+            fail("Exception caused " + e.getMessage());
+        }
+    }
+}

Modified: oozie/branches/hcat-intre/pom.xml
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Tue Nov 27 07:40:35 2012
@@ -81,6 +81,7 @@
          <hive.version>0.9.0</hive.version>
          <pig.version>0.9.0</pig.version>
          <webhcat.version>0.5.0-SNAPSHOT</webhcat.version>
+         <hcatalog.version>0.5.0-SNAPSHOT</hcatalog.version>
          <sqoop.version>1.5.0-incubating-SNAPSHOT</sqoop.version>
          <streaming.version>${hadoop.version}</streaming.version>
          <distcp.version>${hadooplib.version}</distcp.version>
@@ -278,6 +279,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hcatalog</groupId>
+                <artifactId>hcatalog-core</artifactId>
+                <version>${hcatalog.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.openjpa</groupId>
                 <artifactId>openjpa-persistence</artifactId>
                 <version>2.1.0</version>

Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1414006&r1=1414005&r2=1414006&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 27 07:40:35 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1050 Implement logic to update dependencies via push JMS message(mona via mohammad)
 OOZIE-1068 Metadata Accessor service for HCatalog(mohammad)
 OOZIE-1069 Update dataIn and dataOut EL functions to support partitions (mohammad)
 OOZIE-1043 Add logic to register to Missing Dependency Structure in coord action materialization (ryota via mohammad)