You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/10/23 05:52:50 UTC
[16/26] falcon git commit: FALCON-2293 falcon extension fails for
deletion if 2 extensions have same entity name
FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name
Author: sandeep <sa...@gmail.com>
Author: sandeep.samudrala <sa...@gmail.com>
Reviewers: @pallavi-rao
Closes #378 from sandeepSamudrala/FALCON-2293 and squashes the following commits:
478700c [sandeep.samudrala] FALCON-2293 Removed unused import
d506de6 [sandeep] FALCON-2285 Addressed review comments
873889a [sandeep] FALCON-2285 Modified error message
aa2390b [sandeep] FALCON-2293 falcon extension fails for deletion if 2 extensions have same entity name
d0e39e8 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
85750dd [sandeep] Merge branch 'master' of https://github.com/apache/falcon
432a03a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0780363 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a3bd0e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
db425c5 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
3f67fed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cb2b00d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
79e8d64 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
7de7798 [sandeep] go -b FALCON-2263Merge branch 'master' of https://github.com/apache/falcon
c5da0a2 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
(cherry picked from commit 52f308aa262c413c76b7d06bb159930d367ea3a3)
Signed-off-by: Pallavi Rao <pa...@inmobi.com>
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f1a8106a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f1a8106a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f1a8106a
Branch: refs/heads/master
Commit: f1a8106a228691647480e62669d27c02f249ccfa
Parents: 51f1ec9
Author: sandeep <sa...@gmail.com>
Authored: Thu Mar 23 08:44:31 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Mar 23 08:44:55 2017 +0530
----------------------------------------------------------------------
.../org/apache/falcon/ExtensionHandler.java | 2 +-
.../resource/AbstractExtensionManager.java | 20 ++++++++++++++++++++
.../resource/proxy/ExtensionManagerProxy.java | 15 ++++++++-------
.../falcon/unit/LocalExtensionManager.java | 6 ++++++
.../org/apache/falcon/unit/TestFalconUnit.java | 7 +++++++
5 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ExtensionHandler.java b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
index 769cd78..3961e46 100644
--- a/client/src/main/java/org/apache/falcon/ExtensionHandler.java
+++ b/client/src/main/java/org/apache/falcon/ExtensionHandler.java
@@ -148,7 +148,7 @@ public final class ExtensionHandler {
private static String createStagePath(String extensionName, String jobName) {
String stagePath = TMP_BASE_DIR + File.separator + extensionName + PATH_SEPARATOR + jobName
- + PATH_SEPARATOR + System.currentTimeMillis()/1000;
+ + PATH_SEPARATOR + System.currentTimeMillis();
File tmpPath = new File(stagePath);
if (tmpPath.mkdir()) {
throw new FalconCLIException("Failed to create stage directory" + tmpPath.toString());
http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 4ffeb95..2131996 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -276,6 +277,25 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
}
}
+ protected static void checkIfPartOfAnotherExtension(String entityName, EntityType entityType, String jobName)
+ throws FalconException {
+ try {
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+ String extractedJobName = AbstractExtensionManager.getJobNameFromTag(entity.getTags());
+ if (StringUtils.isBlank(extractedJobName)) {
+ LOG.error("Entity:{} is already submitted", entity.getName());
+ throw FalconWebException.newAPIException("Entity:" + entity.getName() + " is already submitted",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ } else if (!extractedJobName.equals(jobName)) {
+ LOG.error("Entity: {} is part another extension job:{}", entity.getName(), extractedJobName);
+ throw FalconWebException.newAPIException("Entity:" + entity.getName() +" is part another extension job:"
+ + extractedJobName, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ } catch (EntityNotRegisteredException ignored) {
+ //Valid. Ignore if its not submitted already.
+ }
+ }
+
protected static ExtensionBean getExtensionIfExists(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 9808892..e05e2e3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -420,8 +420,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
throws FalconException, IOException, JAXBException {
List<Entity> feeds = entityMap.get(EntityType.FEED);
List<Entity> processes = entityMap.get(EntityType.PROCESS);
- validateFeeds(feeds);
- validateProcesses(processes);
+ validateFeeds(feeds, jobName);
+ validateProcesses(processes, jobName);
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
@@ -458,8 +458,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
HttpServletRequest request) throws FalconException, IOException, JAXBException {
List<Entity> feeds = entityMap.get(EntityType.FEED);
List<Entity> processes = entityMap.get(EntityType.PROCESS);
- validateFeeds(feeds);
- validateProcesses(processes);
+ validateFeeds(feeds, jobName);
+ validateProcesses(processes, jobName);
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
@@ -503,16 +503,17 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
return getBufferedRequest(new HttpServletRequestInputStreamWrapper(request, servletInputStream));
}
-
- private void validateFeeds(List<Entity> feeds) throws FalconException {
+ private void validateFeeds(List<Entity> feeds, String jobName) throws FalconException {
for (Entity feed : feeds) {
+ checkIfPartOfAnotherExtension(feed.getName(), EntityType.FEED, jobName);
super.validate(feed);
}
}
- private void validateProcesses(List<Entity> processes) throws FalconException {
+ private void validateProcesses(List<Entity> processes, String jobName) throws FalconException {
ProcessEntityParser processEntityParser = new ProcessEntityParser();
for (Entity process : processes) {
+ checkIfPartOfAnotherExtension(process.getName(), EntityType.PROCESS, jobName);
processEntityParser.validate((Process) process, false);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 8936225..f196736 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -20,6 +20,7 @@ package org.apache.falcon.unit;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -48,8 +49,13 @@ public class LocalExtensionManager extends AbstractExtensionManager {
SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobNameExists(jobName, extensionName);
+ EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.FEED));
+ EntityUtil.applyTags(extensionName, jobName, entityMap.get(EntityType.PROCESS));
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (Entity entity : entry.getValue()) {
+ checkIfPartOfAnotherExtension(entity.getName(), entity.getEntityType(), jobName);
+ }
+ for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f1a8106a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 861a089..6b63c23 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -74,6 +74,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
private static final String JARS_DIR = "file:///" + System.getProperty("user.dir") + "/src/test/resources";
private static final String EXTENSION_PROPERTIES = "extension.properties";
private static final String TEST_JOB = "testJob";
+ private static final String TEST_JOB_DUPLICATE = "testJobDuplicate";
private static final String TEST_EXTENSION = "testExtension";
private FileSystem fileSystem;
@@ -459,6 +460,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
+ try {
+ submitExtensionJob(TEST_EXTENSION, TEST_JOB_DUPLICATE, null, null);
+ } catch (FalconWebException e) {
+ Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Entity:sample is part another "
+ + "extension job:testJob");
+ }
ExtensionJobList extensionJobList = getExtensionJobs(TEST_EXTENSION, null, null);
Assert.assertEquals(extensionJobList.getNumJobs(), 1);