You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ka...@apache.org on 2012/11/12 20:40:45 UTC
svn commit: r1408417 - in /oozie/branches/hcat-intre: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/executor/jpa/
core/src/main/java/org/apache/oozie/service/ core/src/mai...
Author: kamrul
Date: Mon Nov 12 19:40:44 2012
New Revision: 1408417
URL: http://svn.apache.org/viewvc?rev=1408417&view=rev
Log:
OOZIE-1056 Command to update push-based dependency (mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Mon Nov 12 19:40:44 2012
@@ -56,6 +56,8 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
// Update query for InputCheck
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
+ // Update query for Push-based missing dependency check
+ @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
// Update query for Start
@NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
@@ -73,7 +75,7 @@ import org.apache.openjpa.persistence.jd
// Select Query used by Timeout command
@NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"),
// Select query used by InputCheck command
- @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
// Select query used by CoordActionUpdate command
@NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"),
// Select query used by Check command
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java Mon Nov 12 19:40:44 2012
@@ -197,6 +197,8 @@ public enum ErrorCode {
E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
E1022(XLog.STD, "Cannot delete running/completed coordinator action: [{0}]"),
E1023(XLog.STD, "Coord Action push Input Check Error: {0}"),
+ E1024(XLog.STD, "Service not yet initialized : {0}"),
+ E1025(XLog.STD, "URI parsing error : {0}"),
E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
@@ -229,7 +231,7 @@ public enum ErrorCode {
E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
E1502(XLog.STD, "Partition cache lookup error"),
- E1503(XLog.STD, "Error in Metadata URI"),
+ E1503(XLog.STD, "Error in Metadata URI [{0}]"),
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id [{0}]"),;
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,180 @@
+package org.apache.oozie.command.coord;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.PartitionWrapper;
+
+public class CoordActionUpdatePushMissingDependency extends CoordinatorXCommand<Void> {
+
+ private String actionId;
+ private JPAService jpaService = null;
+ private CoordinatorActionBean coordAction = null;
+
+ public CoordActionUpdatePushMissingDependency(String actionId) {
+ super("coord_action_push_md", "coord_action_push_md", 0);
+ this.actionId = actionId;
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ // TODO: Get the list of action from Available map
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ if (pdms == null) {
+ throw new CommandException(ErrorCode.E1024, "PartitionDependencyManagerService");
+ }
+ List<PartitionWrapper> availPartitionList = pdms.getAvailablePartitions(actionId);
+ pdms.getAvailablePartitions(actionId);
+ if (availPartitionList == null || availPartitionList.size() == 0) {
+ LOG.info("There is no available partitionList of action ID: [{0}]", actionId);
+ handleTimeout();
+ return null;
+ }
+ String missPartitions = coordAction.getPushMissingDependencies();
+ LOG.debug("Updating action Id " + actionId + " for available partition of " + availPartitionList.toString()
+ + "missing parts :" + missPartitions);
+ String newMissPartitions = removePartitions(missPartitions, availPartitionList);
+ coordAction.setPushMissingDependencies(newMissPartitions);
+ String otherDeps = coordAction.getMissingDependencies();
+ if ((newMissPartitions == null || newMissPartitions.trim().length() == 0)
+ && (otherDeps == null || otherDeps.trim().length() == 0)) {
+ coordAction.setStatus(CoordinatorAction.Status.READY);
+ // pass jobID to the CoordActionReadyXCommand
+ queue(new CoordActionReadyXCommand(coordAction.getJobId()), 100);
+ }
+ else {
+ handleTimeout();
+ }
+ coordAction.setLastModifiedTime(new Date());
+ if (jpaService != null) {
+ try {
+ jpaService.execute(new CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ }
+ catch (JPAExecutorException jex) {
+ throw new CommandException(ErrorCode.E1023, jex.getMessage(), jex);
+ }
+ finally {
+ // remove from Available map as it is being persisted
+ if (pdms.removeActionFromAvailPartitions(actionId)) {
+ LOG.debug("Succesfully removed actionId: [{0}] from available Map ", actionId);
+ }
+ else {
+ LOG.warn("Unable to remove actionId: [{0}] from available Map ", actionId);
+ }
+ }
+ }
+ return null;
+ }
+
+ // TODO: Revisit efficiency
+ private String removePartitions(String missPartitions, List<PartitionWrapper> availPartitionList)
+ throws CommandException {
+ if (missPartitions == null || missPartitions.length() == 0) {
+ LOG.warn("Missing dependency is empty. avaialableMap is :" + availPartitionList);
+ return null;
+ }
+ List<PartitionWrapper> missPartList = createPartitionWrapper(missPartitions);
+ StringBuilder ret = new StringBuilder();
+ for (PartitionWrapper part : availPartitionList) {
+ if (missPartList.contains(part)) {
+ missPartList.remove(part);
+ LOG.debug("Removing partition " + part);
+ }
+ else {
+ LOG.warn("NOT found partition [{0}] into missingList: [{1}] ", part, missPartList);
+ }
+ }
+ for (PartitionWrapper missParts : missPartList) {
+ if (ret.length() > 0) {
+ ret.append(CoordELFunctions.DIR_SEPARATOR);
+ }
+ ret.append(missParts.toString());
+ }
+ return ret.toString();
+ }
+
+ private List<PartitionWrapper> createPartitionWrapper(String missPartitions) throws CommandException {
+ String[] parts = missPartitions.split(CoordELFunctions.DIR_SEPARATOR);
+ List<PartitionWrapper> ret = new ArrayList<PartitionWrapper>();
+ for (String partURI : parts) {
+ try {
+ ret.add(new PartitionWrapper(partURI));
+ }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E1025, e);
+ }
+ }
+ return ret;
+ }
+
+ private void handleTimeout() {
+ long waitingTime = (new Date().getTime() - Math.max(coordAction.getNominalTime().getTime(), coordAction
+ .getCreatedTime().getTime()))
+ / (60 * 1000);
+ int timeOut = coordAction.getTimeOut();
+ if ((timeOut >= 0) && (waitingTime > timeOut)) {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return actionId;
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ try {
+ jpaService = Services.get().get(JPAService.class);
+ if (jpaService != null) {
+ coordAction = jpaService.execute(new CoordActionGetForInputCheckJPAExecutor(actionId));
+ LogUtils.setLogInfo(coordAction, logInfo);
+ }
+ else {
+ throw new CommandException(ErrorCode.E0610);
+ }
+ }
+ catch (XException ex) {
+ throw new CommandException(ex);
+ }
+
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordActionInputCheck:: Ignoring action. Should be in WAITING state, but state="
+ + coordAction.getStatus());
+ }
+ // TODO: check the parent coordinator job?
+ }
+
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForInputCheckJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -94,7 +94,10 @@ public class CoordActionGetForInputCheck
bean.setMissingDependencies((String) arr[7]);
}
if (arr[8] != null) {
- bean.setTimeOut((Integer) arr[8]);
+ bean.setPushMissingDependencies((String) arr[8]);
+ }
+ if (arr[9] != null) {
+ bean.setTimeOut((Integer) arr[9]);
}
return bean;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -82,6 +82,7 @@ public class CoordActionGetJPAExecutor i
action.setCreatedConf(a.getCreatedConf());
action.setExternalStatus(a.getExternalStatus());
action.setMissingDependencies(a.getMissingDependencies());
+ action.setPushMissingDependencies(a.getPushMissingDependencies());
action.setRunConf(a.getRunConf());
action.setTimeOut(a.getTimeOut());
action.setTrackerUri(a.getTrackerUri());
Added: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java (added)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionUpdatePushInputCheckJPAExecutor.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Updates the action status, pending status, last modified time, push
+ * missingDependencies of CoordinatorAction and persists it. It executes SQL
+ * update query and return type is Void.
+ */
+public class CoordActionUpdatePushInputCheckJPAExecutor implements JPAExecutor<Void> {
+
+ private CoordinatorActionBean coordAction = null;
+
+ public CoordActionUpdatePushInputCheckJPAExecutor(CoordinatorActionBean coordAction) {
+ ParamChecker.notNull(coordAction, "coordAction");
+ this.coordAction = coordAction;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public Void execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK");
+ q.setParameter("id", coordAction.getId());
+ q.setParameter("status", coordAction.getStatus().toString());
+ q.setParameter("lastModifiedTime", new Date());
+ q.setParameter("pushMissingDependencies", coordAction.getPushMissingDependencies());
+ q.executeUpdate();
+ // Since the return type is Void, we have to return null
+ return null;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "CoordActionUpdatePushInputCheckJPAExecutor";
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java Mon Nov 12 19:40:44 2012
@@ -101,6 +101,31 @@ public class PartitionDependencyManagerS
}
/**
+ * Returns a list of partitions for an actionId. 'null' if there is nothing
+ *
+ * @param actionId
+ * @return List of partitions
+ */
+ public List<PartitionWrapper> getAvailablePartitions(String actionId) {
+ return availMap.get(actionId);
+ }
+
+ /**
+ * Remove en entry from available Map
+ *
+ * @param actionId
+ * @return true if the entry exists , otherwise false
+ */
+ public boolean removeActionFromAvailPartitions(String actionId) {
+ boolean ret = false;
+ if (availMap.containsKey(actionId)) {
+ availMap.remove(actionId);
+ ret = true;
+ }
+ return ret;
+ }
+
+ /**
* Adding missing partition entry specified by PartitionWrapper object
*
* @param partition
@@ -112,27 +137,11 @@ public class PartitionDependencyManagerS
String prefix = PartitionWrapper.makePrefix(partition.getServerName(), partition.getDbName());
Map<String, PartitionsGroup> tablePartitionsMap;
String tableName = partition.getTableName();
- PartitionsGroup missingPartitions = null;
- WaitingActions actionsList;
try {
if (hcatInstanceMap.containsKey(prefix)) {
tablePartitionsMap = hcatInstanceMap.get(prefix);
if (tablePartitionsMap.containsKey(tableName)) {
- actionsList = _getActionsForPartition(tablePartitionsMap, tableName, missingPartitions, partition);
- if(missingPartitions != null) {
- if(actionsList != null) {
- // partition exists, therefore append action
- actionsList.addAndUpdate(actionId);
- }
- else {
- // new partition entry and info
- actionsList = new WaitingActions(actionId);
- missingPartitions.addPartitionAndAction(partition, actionsList);
- }
- }
- else {
- log.warn("No partition entries for table [{0}]", tableName);
- }
+ addPartitionEntry(tablePartitionsMap, tableName, partition, actionId);
}
else { // new table entry
tablePartitionsMap = new ConcurrentHashMap<String, PartitionsGroup>();
@@ -296,7 +305,7 @@ public class PartitionDependencyManagerS
return true;
}
else {
- log.warn("HCat Partition [{0}] not found", partition.toString());
+ log.warn("partitionAvailable: HCat Partition [{0}] not found", partition.toString());
}
}
else {
@@ -331,6 +340,29 @@ public class PartitionDependencyManagerS
return partitionAvailable(partition);
}
+ private void addPartitionEntry(Map<String, PartitionsGroup> tableMap, String tableName, PartitionWrapper partition,
+ String actionId) {
+ WaitingActions actionsList = null;
+ PartitionsGroup missingPartitions = tableMap.get(tableName);
+ if (missingPartitions != null && missingPartitions.getPartitionsMap().containsKey(partition)) {
+ actionsList = missingPartitions.getPartitionsMap().get(partition);
+ if (actionsList != null) {
+ // partition exists, therefore append action
+ actionsList.addAndUpdate(actionId);
+ }
+ }
+ else {
+ if (missingPartitions != null) {
+ // new partition entry and info
+ actionsList = new WaitingActions(actionId);
+ missingPartitions.addPartitionAndAction(partition, actionsList);
+ }
+ else {
+ log.warn(" missingPartitions for table [{0}] is NULL.", tableName);
+ }
+ }
+ }
+
private WaitingActions _getActionsForPartition(Map<String, PartitionsGroup> tableMap, String tableName,
PartitionsGroup missingPartitions, PartitionWrapper partition) {
WaitingActions actionsList = null;
@@ -339,7 +371,7 @@ public class PartitionDependencyManagerS
actionsList = missingPartitions.getPartitionsMap().get(partition);
}
else {
- log.warn("HCat Partition [{0}] not found", partition.toString());
+ log.warn( " _getActionsForPartition: HCat Partition [{0}] not found", partition.toString());
}
return actionsList;
}
Modified: oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java (original)
+++ oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java Mon Nov 12 19:40:44 2012
@@ -18,6 +18,7 @@
package org.apache.oozie.util;
+import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Map;
@@ -47,6 +48,10 @@ public class PartitionWrapper {
this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(), hcatUri.getPartitionMap());
}
+ public PartitionWrapper(String partURI) throws URISyntaxException {
+ this(new HCatURI(partURI));
+ }
+
/**
* @return the server name
*/
@@ -105,15 +110,7 @@ public class PartitionWrapper {
@Override
public String toString() {
- StringBuilder partString = new StringBuilder("");
- Iterator<Map.Entry<String, String>> it = partition.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, String> partEntry = it.next();
- partString.append(partEntry.getKey() + "=" + partEntry.getValue() + ";");
- }
- // adding prefix and removing the trailing ";"
- return makePrefix(serverName, dbName) + CONCATENATOR + tableName + CONCATENATOR
- + partString.substring(0, partString.length() - 1);
+ return HCatURI.getHCatURI(serverName, dbName, tableName, partition);
}
@Override
@@ -123,7 +120,7 @@ public class PartitionWrapper {
Map<String, String> p = pw.getPartition();
boolean equals = true;
if (this.serverName.equals(pw.serverName) && this.dbName.equals(pw.dbName)
- && this.tableName.equals(pw.tableName)) {
+ && this.tableName.equals(pw.tableName) && partition.size() == p.size()) {
while (it1.hasNext()) {
String key = it1.next().getKey();
if (!(p.containsKey(key) && p.get(key).equals(partition.get(key)))) {
Added: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1408417&view=auto
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java (added)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java Mon Nov 12 19:40:44 2012
@@ -0,0 +1,208 @@
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.XLog;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordActionUpdatePushMissingDependency extends XDataTestCase {
+ private String TZ;
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME, "myhcatserver");
+ setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME, "myhcatdb");
+ setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, "100");
+ Services services = new Services();
+ addServiceToRun(services.getConf(), PartitionDependencyManagerService.class.getName());
+ services.init();
+ TZ = (getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : getProcessingTZ()
+ .substring(3);
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ @Test
+ public void testUpdateCoordTableBasic() throws Exception {
+ String newHCatDependency = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING, 0);
+
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+
+ pdms.addMissingPartition(newHCatDependency, actionId);
+
+ pdms.partitionAvailable(newHCatDependency);
+ HCatURI hcatUri = new HCatURI(newHCatDependency);
+
+ List<PartitionWrapper> availParts = pdms.getAvailablePartitions(actionId);
+ assertNotNull(availParts);
+ assertEquals(availParts.get(0), new PartitionWrapper(hcatUri));
+
+ new CoordActionUpdatePushMissingDependency(actionId).call();
+
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+ }
+
+ @Test
+ public void testUpdateCoordTableAdvanced() throws Exception {
+ String newHCatDependency1 = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=11®ion=us";
+ String newHCatDependency2 = "hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
+
+ String fullDeps = newHCatDependency1 + CoordELFunctions.DIR_SEPARATOR + newHCatDependency2;
+ String actionId = addInitRecords(fullDeps);
+ checkCoordAction(actionId, fullDeps, CoordinatorAction.Status.WAITING, 0);
+
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+
+ pdms.addMissingPartition(newHCatDependency1, actionId);
+ pdms.addMissingPartition(newHCatDependency2, actionId);
+
+ pdms.partitionAvailable(newHCatDependency2);
+ HCatURI hcatUri2 = new HCatURI(newHCatDependency2);
+
+ List<PartitionWrapper> availParts = pdms.getAvailablePartitions(actionId);
+ assertNotNull(availParts);
+ assertEquals(availParts.get(0), new PartitionWrapper(hcatUri2));
+
+ new CoordActionUpdatePushMissingDependency(actionId).call();
+
+ checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING, 1);
+
+ // second partition available
+
+ pdms.partitionAvailable(newHCatDependency1);
+ HCatURI hcatUri1 = new HCatURI(newHCatDependency1);
+
+ availParts = pdms.getAvailablePartitions(actionId);
+ assertNotNull(availParts);
+ assertEquals(availParts.get(0), new PartitionWrapper(hcatUri1));
+
+ new CoordActionUpdatePushMissingDependency(actionId).call();
+
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY, 0);
+
+ }
+
+ private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat,
+ int type) throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ String missDeps = action.getPushMissingDependencies();
+ if (type != 0) {
+ assertEquals(new PartitionWrapper(missDeps), new PartitionWrapper(expDeps));
+ }
+ else {
+ assertEquals(missDeps, expDeps);
+ }
+ assertEquals(action.getStatus(), stat);
+
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored properly in db");
+ }
+ }
+
+ private String addInitRecords(String pushMissingDependencies) throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, startTime, endTime, false, true, 3);
+
+ CoordinatorActionBean action1 = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", pushMissingDependencies);
+ return action1.getId();
+ }
+
+ protected CoordinatorActionBean addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+ CoordinatorAction.Status status, String resourceXmlName, String pushMissingDependencies) throws Exception {
+ CoordinatorActionBean action = createCoordAction(jobId, actionNum, status, resourceXmlName, 0, TZ);
+ action.setPushMissingDependencies(pushMissingDependencies);
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action);
+ jpaService.execute(coordActionInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord action record to table");
+ throw je;
+ }
+ return action;
+ }
+
+ protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
+ Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
+
+ String testDir = getTestCaseDir();
+ CoordinatorJobBean coordJob = createCoordJob(testFileName, status, start, end, pending, doneMatd, lastActionNum);
+ String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+ coordJob.setJobXml(appXml);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob);
+ jpaService.execute(coordInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord job record to table");
+ throw je;
+ }
+
+ return coordJob;
+ }
+
+ protected String getCoordJobXmlForWaiting(String testFileName, String testDir) {
+ try {
+ Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+ String appXml = IOUtils.getReaderAsString(reader, -1);
+ appXml = appXml.replaceAll("#testDir", testDir);
+ return appXml;
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(XLog.format("Could not get " + testFileName, ioe));
+ }
+ }
+
+ protected String getProcessingTZ() {
+ return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+ }
+}
Modified: oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Mon Nov 12 19:40:44 2012
@@ -577,6 +577,7 @@ public abstract class XDataTestCase exte
throw new IOException(e);
}
action.setLastModifiedTime(new Date());
+ action.setCreatedTime(new Date());
action.setStatus(status);
action.setActionXml(actionXml);
Modified: oozie/branches/hcat-intre/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1408417&r1=1408416&r2=1408417&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Mon Nov 12 19:40:44 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1056 Command to update push-based dependency (mohammad)
OOZIE-1059 Add static method to create URI String in HCatURI(ryota via mohammad)
OOZIE-1039 Implement the Missing Dependency structure for HCat partitions (mona via mohammad)
OOZIE-1028 Add EL function to allow date ranges to be used for dataset ranges (rkanter via tucu)