You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/05/06 22:55:32 UTC

oozie git commit: OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions

Repository: oozie
Updated Branches:
  refs/heads/master 6ea1ed833 -> 8d381099b


OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8d381099
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8d381099
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8d381099

Branch: refs/heads/master
Commit: 8d381099b5541986535c70050d48269869ff007a
Parents: 6ea1ed8
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Wed May 6 13:55:16 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Wed May 6 13:55:16 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/BundleEngine.java     |  2 +-
 .../org/apache/oozie/CoordinatorEngine.java     |  2 +-
 .../main/java/org/apache/oozie/DagEngine.java   |  2 +-
 .../PartitionDependencyManagerService.java      | 13 ++++-
 ...TestHAPartitionDependencyManagerService.java | 57 +++++++++++++++++++-
 release-log.txt                                 |  1 +
 6 files changed, 72 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index bfb29e6..4f0f3bf 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -292,7 +292,7 @@ public class BundleEngine extends BaseEngine {
             if (lastTime == null) {
                 lastTime = new Date();
             }
-            fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
         }
         catch (Exception ex) {
             throw new IOException(ex);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index cb67be0..482d6a9 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -347,7 +347,7 @@ public class CoordinatorEngine extends BaseEngine {
             if (lastTime == null) {
                 lastTime = new Date();
             }
-            fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
         }
         catch (Exception e) {
             throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index 08a126c..3d09206 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -458,7 +458,7 @@ public class DagEngine extends BaseEngine {
             if (lastTime == null) {
                 lastTime = job.getLastModifiedTime();
             }
-            fetchLog(filter, job.getStartTime(), lastTime, writer, params, logType);
+            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
         }
         catch (Exception e) {
             throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
index c8f2b20..a350772 100644
--- a/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
+++ b/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.service;
 
+import java.text.MessageFormat;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -27,9 +28,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
 import org.apache.oozie.dependency.hcat.HCatDependencyCache;
@@ -119,7 +122,15 @@ public class PartitionDependencyManagerService implements Service {
                         caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
                     }
                     catch (JPAExecutorException e) {
-                        LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+                        if (e.getErrorCode() == ErrorCode.E0605) {
+                            LOG.info(MessageFormat.format(
+                                    "Coord action {0} is not in database, deleting it from cache", actionId));
+                            staleActions.add(actionId);
+                            actionItr.remove();
+                        }
+                        else {
+                            LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
+                        }
                     }
                     if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
                         staleActions.add(actionId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
index 031e3e4..d681d42 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAPartitionDependencyManagerService.java
@@ -28,9 +28,10 @@ import org.apache.oozie.client.CoordinatorAction.Status;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.dependency.hcat.HCatMessageHandler;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
+import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.service.RecoveryService.RecoveryRunnable;
 
 public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
 
@@ -255,4 +256,58 @@ public class TestHAPartitionDependencyManagerService extends ZKXTestCase {
         // mytbl2 should NOT be in topic map
         assertFalse(hcatService.isRegisteredForNotification(dep3));
     }
+
+    public void testCheckAfterActionDelete() throws Exception {
+        Services.get().setService(ZKJobsConcurrencyService.class);
+        Services.get().get(ConfigurationService.class).getConf()
+                .setInt(PartitionDependencyManagerService.CACHE_PURGE_TTL, 0);
+
+        db = "default";
+        table1 = "mytbl";
+        table2 = "mytb2";
+        part1 = "dt=20120101;country=us";
+        part2 = "dt=20120102;country=us";
+        part3 = "dt=20120103;country=us";
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part1;
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table1 + "/" + part2;
+        String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table2 + "/" + part3;
+        HCatURI dep1 = new HCatURI(newHCatDependency1);
+        HCatURI dep2 = new HCatURI(newHCatDependency2);
+        HCatURI dep3 = new HCatURI(newHCatDependency3);
+        // create db, table and partitions
+        populateTable();
+
+        String actionId1 = addInitRecords(newHCatDependency1);
+        String actionId2 = addInitRecords(newHCatDependency2);
+        String actionId3 = addInitRecords(newHCatDependency3);
+
+        PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
+        pdms.init(Services.get());
+        pdms.addMissingDependency(dep1, actionId1);
+        pdms.addMissingDependency(dep2, actionId2);
+        pdms.addMissingDependency(dep3, actionId3);
+        pdms.runCachePurgeWorker();
+
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep1));
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep2));
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+
+        List<String> deleteList = new ArrayList<String>();
+        deleteList.add(actionId1);
+        JPAService jpaService = Services.get().get(JPAService.class);
+        jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+        pdms.runCachePurgeWorker();
+
+        assertNull((Collection<String>) pdms.getWaitingActions(dep1));
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep2));
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+        deleteList.clear();
+        deleteList.add(actionId2);
+        jpaService.execute(new CoordActionsDeleteJPAExecutor(deleteList));
+        pdms.runCachePurgeWorker();
+        assertNull((Collection<String>) pdms.getWaitingActions(dep1));
+        assertNull((Collection<String>) pdms.getWaitingActions(dep2));
+        assertNotNull((Collection<String>) pdms.getWaitingActions(dep3));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/8d381099/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7e85b2c..a2c0fe6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2227 PartitionDependencyManagerService keeps on purging delete coord actions (puru)
 OOZIE-2163 Remove CoordinatorStore (seoeun25 via bzhang)
 OOZIE-2221 Oozie audit log has null id for some of input request (puru)
 OOZIE-2223 Improve documentation with regard to Java action retries (ben.roling via bzhang)