You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/05/06 22:55:32 UTC
oozie git commit: OOZIE-2227 PartitionDependencyManagerService keeps
on purging delete coord actions
Repository: oozie
Updated Branches:
refs/heads/master 6ea1ed833 -> 8d381099b
OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d381099
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d381099
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d381099
Branch: refs/heads/master
Commit: 8d381099b5541986535c70050d48269869ff007a
Parents: 6ea1ed8
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Wed May 6 13:55:16 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Wed May 6 13:55:16 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/oozie/BundleEngine.java | 2 +-
.../org/apache/oozie/CoordinatorEngine.java | 2 +-
.../main/java/org/apache/oozie/DagEngine.java | 2 +-
.../PartitionDependencyManagerService.java | 13 ++++-
...TestHAPartitionDependencyManagerService.java | 57 +++++++++++++++++++-
release-log.txt | 1 +
6 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index bfb29e6..4f0f3bf 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -292,7 +292,7 @@ public class BundleEngine extends BaseEngine {
if (lastTime == null) {
lastTime = new Date();
}
- fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+ fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
}
catch (Exception ex) {
throw new IOException(ex);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index cb67be0..482d6a9 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -347,7 +347,7 @@ public class CoordinatorEngine extends BaseEngine {
if (lastTime == null) {
lastTime = new Date();
}
- fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+ fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
}
catch (Exception e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index 08a126c..3d09206 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -458,7 +458,7 @@ public class DagEngine extends BaseEngine {
if (lastTime == null) {
lastTime = job.getLastModifiedTime();
}
- fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+ fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
}
catch (Exception e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
index c8f2b20..a350772 100644
--- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
+++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
@@ -18,6 +18,7 @@
package org.apache.oozie.service;
+import java.text.MessageFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
@@ -27,9 +28,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.dependency.hcat.HCatDependencyCache;
@@ -119,7 +122,15 @@ public class PartitionDependencyManagerService implements Service {
caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
}
catch (JPAExecutorException e) {
- LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+ if (e.getErrorCode() == ErrorCode.E0605) {
+ LOG.info(MessageFormat.format(
+ "Coord action {0} is not in database, deleting it from cache", actionId));
+ staleActions.add(actionId);
+ actionItr.remove();
+ }
+ else {
+ LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+ }
}
if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
staleActions.add(actionId);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
index 031e3e4..d681d42 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -28,9 +28,10 @@ import org.apache.oozie.client.CoordinatorAction.Status;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
+import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
@@ -255,4 +256,58 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
// mytbl2 should NOT be in topic map
assertFalse(hcatService.isRegisteredForNotification(dep3));
}
+
+ public void testCheckAfterActionDelete() throws Exception {
+ Services.get().setService(ZKJobsConcurrencyService.class);
+ Services.get().get(ConfigurationService.class).getConf()
+ .setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0);
+
+ db = "default";
+ table1 = "mytbl";
+ table2 = "mytb2";
+ part1 = "dt=20120101;country=us";
+ part2 = "dt=20120102;country=us";
+ part3 = "dt=20120103;country=us";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1;
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2;
+ String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3;
+ HCatURI dep1 = new HCatURI(newHCatDependency1);
+ HCatURI dep2 = new HCatURI(newHCatDependency2);
+ HCatURI dep3 = new HCatURI(newHCatDependency3);
+ // create db, table and partitions
+ populateTable();
+
+ String actionId1 = addInitRecords(newHCatDependency1);
+ String actionId2 = addInitRecords(newHCatDependency2);
+ String actionId3 = addInitRecords(newHCatDependency3);
+
+ PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+ pdms.init(Services.get());
+ pdms.addMissingDependency(dep1, actionId1);
+ pdms.addMissingDependency(dep2, actionId2);
+ pdms.addMissingDependency(dep3, actionId3);
+ pdms.runCachePurgeWorker();
+
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep1));
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep2));
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+
+ List<String> deleteList = new ArrayList<String>();
+ deleteList.add(actionId1);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+ pdms.runCachePurgeWorker();
+
+ assertNull((Collection<String>) pdms.getWaitingActions(dep1));
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep2));
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+ deleteList.clear();
+ deleteList.add(actionId2);
+ jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+ pdms.runCachePurgeWorker();
+ assertNull((Collection<String>) pdms.getWaitingActions(dep1));
+ assertNull((Collection<String>) pdms.getWaitingActions(dep2));
+ assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7e85b2c..a2c0fe6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions (puru)
OOZIE-2163 Remove CoordinatorStore (seoeun25 via bzhang)
OOZIE-2221 Oozie audit log has null id for some of input request (puru)
OOZIE-2223 Improve documentation with regard to Java action retries (ben.roling via bzhang)