You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/12 20:40:45 UTC

svn commit: r1408417 - in /oozie/branches/hcat-intre: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/command/coord/ core/src/main/java/org/apache/oozie/executor/jpa/ core/src/main/java/org/apache/oozie/service/ core/src/mai...

Author: kamrul
Date: Mon Nov 12 19:40:44 2012
New Revision: 1408417

URL: http://svn.apache.org/viewvc?rev=1408417&view=rev
Log:
OOZIE-1056 Command to update push-based dependency (mohammad)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
Modified:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/branches/hcat-intre/release-log.txt

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Mon Nov 12 19:40:44 2012
@@ -56,6 +56,8 @@ import org.apache.openjpa.persistence.jd
         @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
         // Update query for InputCheck
         @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
+        // Update query for Push-based missing dependency check
+        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
         // Update query for Start
         @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
 
@@ -73,7 +75,7 @@ import org.apache.openjpa.persistence.jd
         // Select Query used by Timeout command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
         // Select query used by InputCheck command
-        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
+        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
         // Select query used by CoordActionUpdate command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
         // Select query used by Check command

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Mon Nov 12 19:40:44 2012
@@ -197,6 +197,8 @@ public enum ErrorCode {
     E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
     E1022(XLog.STD, "Cannot delete running/completed coordinator action: [{0}]"),
     E1023(XLog.STD, "Coord Action push Input Check Error: {0}"),
+    E1024(XLog.STD, "Service not yet initialized : {0}"),
+    E1025(XLog.STD, "URI parsing error : {0}"),
 
     E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
 
@@ -229,7 +231,7 @@ public enum ErrorCode {
 
     E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
     E1502(XLog.STD, "Partition cache lookup error"),
-    E1503(XLog.STD, "Error in Metadata URI"),
+    E1503(XLog.STD, "Error in Metadata URI [{0}]"),
 
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
 

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,180 @@
+package org.apache.oozie.command.coord;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.PartitionWrapper;
+
+public class CoordActionUpdatePushMissingDependency extends CoordinatorXCommand<Void> {
+
+    private String actionId;
+    private JPAService jpaService = null;
+    private CoordinatorActionBean coordAction = null;
+
+    public CoordActionUpdatePushMissingDependency(String actionId) {
+        super("coord_action_push_md", "coord_action_push_md", 0);
+        this.actionId = actionId;
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        // TODO: Get the list of action from Available map
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        if (pdms == null) {
+            throw new CommandException(ErrorCode.E1024, "PartitionDependencyManagerService");
+        }
+        List<PartitionWrapper> availPartitionList = pdms.getAvailablePartitions(actionId);
+        pdms.getAvailablePartitions(actionId);
+        if (availPartitionList == null || availPartitionList.size() == 0) {
+            LOG.info("There is no available partitionList of action ID: [{0}]", actionId);
+            handleTimeout();
+            return null;
+        }
+        String missPartitions = coordAction.getPushMissingDependencies();
+        LOG.debug("Updating action Id " + actionId + " for available partition of " + availPartitionList.toString()
+                + "missing parts :" + missPartitions);
+        String newMissPartitions = removePartitions(missPartitions, availPartitionList);
+        coordAction.setPushMissingDependencies(newMissPartitions);
+        String otherDeps = coordAction.getMissingDependencies();
+        if ((newMissPartitions == null || newMissPartitions.trim().length() == 0)
+                && (otherDeps == null || otherDeps.trim().length() == 0)) {
+            coordAction.setStatus(CoordinatorAction.Status.READY);
+            // pass jobID to the CoordActionReadyXCommand
+            queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+        }
+        else {
+            handleTimeout();
+        }
+        coordAction.setLastModifiedTime(new Date());
+        if (jpaService != null) {
+            try {
+                jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+            }
+            catch (JPAExecutorException jex) {
+                throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
+            }
+            finally {
+                // remove from Available map as it is being persisted
+                if (pdms.removeActionFromAvailPartitions(actionId)) {
+                    LOG.debug("Succesfully removed actionId: [{0}] from available Map ", actionId);
+                }
+                else {
+                    LOG.warn("Unable to remove actionId: [{0}] from available Map ", actionId);
+                }
+            }
+        }
+        return null;
+    }
+
+    // TODO: Revisit efficiency
+    private String removePartitions(String missPartitions, List<PartitionWrapper> availPartitionList)
+            throws CommandException {
+        if (missPartitions == null || missPartitions.length() == 0) {
+            LOG.warn("Missing dependency is empty. avaialableMap is :" + availPartitionList);
+            return null;
+        }
+        List<PartitionWrapper> missPartList = createPartitionWrapper(missPartitions);
+        StringBuilder ret = new StringBuilder();
+        for (PartitionWrapper part : availPartitionList) {
+            if (missPartList.contains(part)) {
+                missPartList.remove(part);
+                LOG.debug("Removing partition " + part);
+            }
+            else {
+                LOG.warn("NOT found partition [{0}] into missingList: [{1}] ", part, missPartList);
+            }
+        }
+        for (PartitionWrapper missParts : missPartList) {
+            if (ret.length() > 0) {
+                ret.append(CoordELFunctions.DIR_SEPARATOR);
+            }
+            ret.append(missParts.toString());
+        }
+        return ret.toString();
+    }
+
+    private List<PartitionWrapper> createPartitionWrapper(String missPartitions) throws CommandException {
+        String[] parts = missPartitions.split(CoordELFunctions.DIR_SEPARATOR);
+        List<PartitionWrapper> ret = new ArrayList<PartitionWrapper>();
+        for (String partURI : parts) {
+            try {
+                ret.add(new PartitionWrapper(partURI));
+            }
+            catch (URISyntaxException e) {
+                throw new CommandException(ErrorCode.E1025, e);
+            }
+        }
+        return ret;
+    }
+
+    private void handleTimeout() {
+        long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        int timeOut = coordAction.getTimeOut();
+        if ((timeOut >= 0) && (waitingTime > timeOut)) {
+            queue(new CoordActionTimeOutXCommand(coordAction), 100);
+        }
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return actionId;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        try {
+            jpaService = Services.get().get(JPAService.class);
+            if (jpaService != null) {
+                coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+                LogUtils.setLogInfo(coordAction, logInfo);
+            }
+            else {
+                throw new CommandException(ErrorCode.E0610);
+            }
+        }
+        catch (XException ex) {
+            throw new CommandException(ex);
+        }
+
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
+                    + coordAction.getStatus());
+        }
+        // TODO: check the parent coordinator job?
+    }
+
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -94,7 +94,10 @@ public class CoordActionGetForInputCheck
             bean.setMissingDependencies((String) arr[7]);
         }
         if (arr[8] != null) {
-            bean.setTimeOut((Integer) arr[8]);
+            bean.setPushMissingDependencies((String) arr[8]);
+        }
+        if (arr[9] != null) {
+            bean.setTimeOut((Integer) arr[9]);
         }
         return bean;
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -82,6 +82,7 @@ public class CoordActionGetJPAExecutor i
             action.setCreatedConf(a.getCreatedConf());
             action.setExternalStatus(a.getExternalStatus());
             action.setMissingDependencies(a.getMissingDependencies());
+            action.setPushMissingDependencies(a.getPushMissingDependencies());
             action.setRunConf(a.getRunConf());
             action.setTimeOut(a.getTimeOut());
             action.setTrackerUri(a.getTrackerUri());

Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Updates the action status, pending status, last modified time, push
+ * missingDependencies of CoordinatorAction and persists it. It executes SQL
+ * update query and return type is Void.
+ */
+public class CoordActionUpdatePushInputCheckJPAExecutor implements JPAExecutor<Void> {
+
+    private CoordinatorActionBean coordAction = null;
+
+    public CoordActionUpdatePushInputCheckJPAExecutor(CoordinatorActionBean coordAction) {
+        ParamChecker.notNull(coordAction, "coordAction");
+        this.coordAction = coordAction;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+     * EntityManager)
+     */
+    @Override
+    public Void execute(EntityManager em) throws JPAExecutorException {
+        try {
+            Query q = em.createNamedQuery("UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK");
+            q.setParameter("id", coordAction.getId());
+            q.setParameter("status", coordAction.getStatus().toString());
+            q.setParameter("lastModifiedTime", new Date());
+            q.setParameter("pushMissingDependencies", coordAction.getPushMissingDependencies());
+            q.executeUpdate();
+            // Since the return type is Void, we have to return null
+            return null;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "CoordActionUpdatePushInputCheckJPAExecutor";
+    }
+}

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Mon Nov 12 19:40:44 2012
@@ -101,6 +101,31 @@ public class PartitionDependencyManagerS
     }
 
     /**
+     * Returns a list of partitions for an actionId. 'null' if there is nothing
+     *
+     * @param actionId
+     * @return List of partitions
+     */
+    public List<PartitionWrapper> getAvailablePartitions(String actionId) {
+        return availMap.get(actionId);
+    }
+
+    /**
+     * Remove en entry from available Map
+     *
+     * @param actionId
+     * @return true if the entry exists , otherwise false
+     */
+    public boolean removeActionFromAvailPartitions(String actionId) {
+        boolean ret = false;
+        if (availMap.containsKey(actionId)) {
+            availMap.remove(actionId);
+            ret = true;
+        }
+        return ret;
+    }
+
+ /**
      * Adding missing partition entry specified by PartitionWrapper object
      *
      * @param partition
@@ -112,27 +137,11 @@ public class PartitionDependencyManagerS
         String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
         Map<String, PartitionsGroup> tablePartitionsMap;
         String tableName = partition.getTableName();
-        PartitionsGroup missingPartitions = null;
-        WaitingActions actionsList;
         try {
             if (hcatInstanceMap.containsKey(prefix)) {
                 tablePartitionsMap = hcatInstanceMap.get(prefix);
                 if (tablePartitionsMap.containsKey(tableName)) {
-                    actionsList = _getActionsForPartition(tablePartitionsMap, tableName, missingPartitions, partition);
-                    if(missingPartitions != null) {
-                        if(actionsList != null) {
-                            // partition exists, therefore append action
-                            actionsList.addAndUpdate(actionId);
-                        }
-                        else {
-                            // new partition entry and info
-                            actionsList = new WaitingActions(actionId);
-                            missingPartitions.addPartitionAndAction(partition, actionsList);
-                        }
-                    }
-                    else {
-                        log.warn("No partition entries for table [{0}]", tableName);
-                    }
+                    addPartitionEntry(tablePartitionsMap, tableName, partition, actionId);
                 }
                 else { // new table entry
                     tablePartitionsMap = new ConcurrentHashMap<String, PartitionsGroup>();
@@ -296,7 +305,7 @@ public class PartitionDependencyManagerS
                     return true;
                 }
                 else {
-                    log.warn("HCat Partition [{0}] not found", partition.toString());
+                    log.warn("partitionAvailable: HCat Partition [{0}] not found", partition.toString());
                 }
             }
             else {
@@ -331,6 +340,29 @@ public class PartitionDependencyManagerS
         return partitionAvailable(partition);
     }
 
+    private void addPartitionEntry(Map<String, PartitionsGroup> tableMap, String tableName, PartitionWrapper partition,
+            String actionId) {
+        WaitingActions actionsList = null;
+        PartitionsGroup missingPartitions = tableMap.get(tableName);
+        if (missingPartitions != null && missingPartitions.getPartitionsMap().containsKey(partition)) {
+            actionsList = missingPartitions.getPartitionsMap().get(partition);
+            if (actionsList != null) {
+                // partition exists, therefore append action
+                actionsList.addAndUpdate(actionId);
+            }
+        }
+        else {
+            if (missingPartitions != null) {
+                // new partition entry and info
+                actionsList = new WaitingActions(actionId);
+                missingPartitions.addPartitionAndAction(partition, actionsList);
+            }
+            else {
+                log.warn(" missingPartitions for table [{0}]  is NULL.", tableName);
+            }
+        }
+    }
+
     private WaitingActions _getActionsForPartition(Map<String, PartitionsGroup> tableMap, String tableName,
             PartitionsGroup missingPartitions, PartitionWrapper partition) {
         WaitingActions actionsList = null;
@@ -339,7 +371,7 @@ public class PartitionDependencyManagerS
             actionsList = missingPartitions.getPartitionsMap().get(partition);
         }
         else {
-            log.warn("HCat Partition [{0}] not found", partition.toString());
+            log.warn( " _getActionsForPartition: HCat Partition [{0}] not found", partition.toString());
         }
         return actionsList;
     }

Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java Mon Nov 12 19:40:44 2012
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.util;
 
+import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -47,6 +48,10 @@ public class PartitionWrapper {
         this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
     }
 
+    public PartitionWrapper(String partURI) throws URISyntaxException {
+        this(new HCatURI(partURI));
+    }
+
     /**
      * @return the server name
      */
@@ -105,15 +110,7 @@ public class PartitionWrapper {
 
     @Override
     public String toString() {
-        StringBuilder partString = new StringBuilder("");
-        Iterator<Map.Entry<String, String>> it = partition.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<String, String> partEntry = it.next();
-            partString.append(partEntry.getKey() + "=" + partEntry.getValue() + ";");
-        }
-        // adding prefix and removing the trailing ";"
-        return makePrefix(serverName, dbName) + CONCATENATOR + tableName + CONCATENATOR
-                + partString.substring(0, partString.length() - 1);
+        return HCatURI.getHCatURI(serverName, dbName, tableName, partition);
     }
 
     @Override
@@ -123,7 +120,7 @@ public class PartitionWrapper {
         Map<String, String> p = pw.getPartition();
         boolean equals = true;
         if (this.serverName.equals(pw.serverName) && this.dbName.equals(pw.dbName)
-                && this.tableName.equals(pw.tableName)) {
+                && this.tableName.equals(pw.tableName) && partition.size() == p.size()) {
             while (it1.hasNext()) {
                 String key = it1.next().getKey();
                 if (!(p.containsKey(key) && p.get(key).equals(partition.get(key)))) {

Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,208 @@
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.XLog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordActionUpdatePushMissingDependency extends XDataTestCase {
+    private String TZ;
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME, "myhcatserver");
+        setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME, "myhcatdb");
+        setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, "100");
+        Services services = new Services();
+        addServiceToRun(services.getConf(), PartitionDependencyManagerService.class.getName());
+        services.init();
+        TZ = (getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : getProcessingTZ()
+                .substring(3);
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testUpdateCoordTableBasic() throws Exception {
+        String newHCatDependency = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+
+        pdms.addMissingPartition(newHCatDependency, actionId);
+
+        pdms.partitionAvailable(newHCatDependency);
+        HCatURI hcatUri = new HCatURI(newHCatDependency);
+
+        List<PartitionWrapper> availParts = pdms.getAvailablePartitions(actionId);
+        assertNotNull(availParts);
+        assertEquals(availParts.get(0), new PartitionWrapper(hcatUri));
+
+        new CoordActionUpdatePushMissingDependency(actionId).call();
+
+        checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+    }
+
+    @Test
+    public void testUpdateCoordTableAdvanced() throws Exception {
+        String newHCatDependency1 = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=11&region=us";
+        String newHCatDependency2 = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
+
+        String fullDeps = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+        String actionId = addInitRecords(fullDeps);
+        checkCoordAction(actionId, fullDeps, CoordinatorAction.Status.WAITING, 0);
+
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+
+        pdms.addMissingPartition(newHCatDependency1, actionId);
+        pdms.addMissingPartition(newHCatDependency2, actionId);
+
+        pdms.partitionAvailable(newHCatDependency2);
+        HCatURI hcatUri2 = new HCatURI(newHCatDependency2);
+
+        List<PartitionWrapper> availParts = pdms.getAvailablePartitions(actionId);
+        assertNotNull(availParts);
+        assertEquals(availParts.get(0), new PartitionWrapper(hcatUri2));
+
+        new CoordActionUpdatePushMissingDependency(actionId).call();
+
+        checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING, 1);
+
+        // second partition available
+
+        pdms.partitionAvailable(newHCatDependency1);
+        HCatURI hcatUri1 = new HCatURI(newHCatDependency1);
+
+        availParts = pdms.getAvailablePartitions(actionId);
+        assertNotNull(availParts);
+        assertEquals(availParts.get(0), new PartitionWrapper(hcatUri1));
+
+        new CoordActionUpdatePushMissingDependency(actionId).call();
+
+        checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+    }
+
+    private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat,
+            int type) throws Exception {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+            String missDeps = action.getPushMissingDependencies();
+            if (type != 0) {
+                assertEquals(new PartitionWrapper(missDeps), new PartitionWrapper(expDeps));
+            }
+            else {
+                assertEquals(missDeps, expDeps);
+            }
+            assertEquals(action.getStatus(), stat);
+
+            return action;
+        }
+        catch (JPAExecutorException se) {
+            throw new Exception("Action ID " + actionId + " was not stored properly in db");
+        }
+    }
+
+    private String addInitRecords(String pushMissingDependencies) throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
+        CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNING, startTime, endTime, false, true, 3);
+
+        CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+                CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", pushMissingDependencies);
+        return action1.getId();
+    }
+
+    protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+            CoordinatorAction.Status status, String resourceXmlName, String pushMissingDependencies) throws Exception {
+        CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0, TZ);
+        action.setPushMissingDependencies(pushMissingDependencies);
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action);
+            jpaService.execute(coordActionInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord action record to table");
+            throw je;
+        }
+        return action;
+    }
+
+    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
+            Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
+
+        String testDir = getTestCaseDir();
+        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum);
+        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+        coordJob.setJobXml(appXml);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
+            jpaService.execute(coordInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord job record to table");
+            throw je;
+        }
+
+        return coordJob;
+    }
+
+    protected String getCoordJobXmlForWaiting(String testFileName, String testDir) {
+        try {
+            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+            String appXml = IOUtils.getReaderAsString(reader, -1);
+            appXml = appXml.replaceAll("#testDir", testDir);
+            return appXml;
+        }
+        catch (IOException ioe) {
+            throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe));
+        }
+    }
+
+    protected String getProcessingTZ() {
+        return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+    }
+}

Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Mon Nov 12 19:40:44 2012
@@ -577,6 +577,7 @@ public abstract class XDataTestCase exte
             throw new IOException(e);
         }
         action.setLastModifiedTime(new Date());
+        action.setCreatedTime(new Date());
         action.setStatus(status);
         action.setActionXml(actionXml);
 

Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Mon Nov 12 19:40:44 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1056 Command to update push-based dependency (mohammad)
 OOZIE-1059 Add static method to create URI String in HCatURI(ryota via mohammad)
 OOZIE-1039 Implement the Missing Dependency structure for HCat partitions (mona via mohammad)
 OOZIE-1028 Add EL function to allow date ranges to be used for dataset ranges (rkanter via tucu)