You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/10/23 05:52:50 UTC

[16/26] falcon git commit: FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name

FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name

Author: sandeep <sa...@gmail.com>
Author: sandeep.samudrala <sa...@gmail.com>

Reviewers: @pallavi-rao

Closes #378 from sandeepSamudrala/FALCON-2293 and squashes the following commits:

478700c [sandeep.samudrala] FALCON-2293 Removed unused import
d506de6 [sandeep] FALCON-2285 Addressed review comments
873889a [sandeep] FALCON-2285 Modified error message
aa2390b [sandeep] FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name
d0e39e8 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
85750dd [sandeep] Merge branch 'master' of https://github.com/apache/falcon
432a03a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0780363 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a3bd0e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
db425c5 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
3f67fed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cb2b00d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
79e8d64 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
7de7798 [sandeep] go -b FALCON-2263Merge branch 'master' of https://github.com/apache/falcon
c5da0a2 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes

(cherry picked from commit 52f308aa262c413c76b7d06bb159930d367ea3a3)
Signed-off-by: Pallavi Rao <pa...@inmobi.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f1a8106a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f1a8106a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f1a8106a

Branch: refs/heads/master
Commit: f1a8106a228691647480e62669d27c02f249ccfa
Parents: 51f1ec9
Author: sandeep <sa...@gmail.com>
Authored: Thu Mar 23 08:44:31 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Mar 23 08:44:55 2017 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/ExtensionHandler.java     |  2 +-
 .../resource/AbstractExtensionManager.java      | 20 ++++++++++++++++++++
 .../resource/proxy/ExtensionManagerProxy.java   | 15 ++++++++-------
 .../falcon/unit/LocalExtensionManager.java      |  6 ++++++
 .../org/apache/falcon/unit/TestFalconUnit.java  |  7 +++++++
 5 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index 769cd78..3961e46 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -148,7 +148,7 @@ public final class ExtensionHandler {
 
     private static String createStagePath(String extensionName, String jobName) {
         String stagePath = TMP_BASE_DIR + File.separator + extensionName + PATH_SEPARATOR + jobName
-                + PATH_SEPARATOR + System.currentTimeMillis()/1000;
+                + PATH_SEPARATOR + System.currentTimeMillis();
         File tmpPath = new File(stagePath);
         if (tmpPath.mkdir()) {
             throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString());

http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 4ffeb95..2131996 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -276,6 +277,25 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         }
     }
 
+    protected static void checkIfPartOfAnotherExtension(String entityName, EntityType entityType, String jobName)
+        throws FalconException {
+        try {
+            Entity entity = EntityUtil.getEntity(entityType, entityName);
+            String extractedJobName = AbstractExtensionManager.getJobNameFromTag(entity.getTags());
+            if (StringUtils.isBlank(extractedJobName)) {
+                LOG.error("Entity:{} is already submitted", entity.getName());
+                throw FalconWebException.newAPIException("Entity:" + entity.getName() + " is already submitted",
+                        Response.Status.INTERNAL_SERVER_ERROR);
+            } else if (!extractedJobName.equals(jobName)) {
+                LOG.error("Entity: {} is part another extension job:{}", entity.getName(), extractedJobName);
+                throw FalconWebException.newAPIException("Entity:" + entity.getName() +" is part another extension job:"
+                        + extractedJobName, Response.Status.INTERNAL_SERVER_ERROR);
+            }
+        } catch (EntityNotRegisteredException ignored) {
+            //Valid. Ignore if its not submitted already.
+        }
+    }
+
     protected static ExtensionBean getExtensionIfExists(String extensionName) {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionBean extensionBean = metaStore.getDetail(extensionName);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 9808892..e05e2e3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -420,8 +420,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         throws FalconException, IOException, JAXBException {
         List<Entity> feeds = entityMap.get(EntityType.FEED);
         List<Entity> processes = entityMap.get(EntityType.PROCESS);
-        validateFeeds(feeds);
-        validateProcesses(processes);
+        validateFeeds(feeds, jobName);
+        validateProcesses(processes, jobName);
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
 
@@ -458,8 +458,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
                                 HttpServletRequest request) throws FalconException, IOException, JAXBException {
         List<Entity> feeds = entityMap.get(EntityType.FEED);
         List<Entity> processes = entityMap.get(EntityType.PROCESS);
-        validateFeeds(feeds);
-        validateProcesses(processes);
+        validateFeeds(feeds, jobName);
+        validateProcesses(processes, jobName);
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
 
@@ -503,16 +503,17 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         return getBufferedRequest(new HttpServletRequestInputStreamWrapper(request, servletInputStream));
     }
 
-
-    private void validateFeeds(List<Entity> feeds) throws FalconException {
+    private void validateFeeds(List<Entity> feeds, String jobName) throws FalconException {
         for (Entity feed : feeds) {
+            checkIfPartOfAnotherExtension(feed.getName(), EntityType.FEED, jobName);
             super.validate(feed);
         }
     }
 
-    private void validateProcesses(List<Entity> processes) throws FalconException {
+    private void validateProcesses(List<Entity> processes, String jobName) throws FalconException {
         ProcessEntityParser processEntityParser = new ProcessEntityParser();
         for (Entity process : processes) {
+            checkIfPartOfAnotherExtension(process.getName(), EntityType.PROCESS, jobName);
             processEntityParser.validate((Process) process, false);
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 8936225..f196736 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -20,6 +20,7 @@ package org.apache.falcon.unit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -48,8 +49,13 @@ public class LocalExtensionManager extends AbstractExtensionManager {
                                  SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
         checkIfExtensionIsEnabled(extensionName);
         checkIfExtensionJobNameExists(jobName, extensionName);
+        EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.FEED));
+        EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.PROCESS));
         for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
             for (Entity entity : entry.getValue()) {
+                checkIfPartOfAnotherExtension(entity.getName(), entity.getEntityType(), jobName);
+            }
+            for (Entity entity : entry.getValue()) {
                 submitInternal(entity, "falconUser");
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 861a089..6b63c23 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -74,6 +74,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
     private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
     private static final String EXTENSION_PROPERTIES = "extension.properties";
     private static final String TEST_JOB = "testJob";
+    private static final String TEST_JOB_DUPLICATE = "testJobDuplicate";
     private static final String TEST_EXTENSION = "testExtension";
     private FileSystem fileSystem;
 
@@ -459,6 +460,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
         apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
         assertStatus(apiResult);
 
+        try {
+            submitExtensionJob(TEST_EXTENSION, TEST_JOB_DUPLICATE, null, null);
+        } catch (FalconWebException e) {
+            Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Entity:sample is part another "
+                    + "extension job:testJob");
+        }
         ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null);
         Assert.assertEquals(extensionJobList.getNumJobs(), 1);