You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/19 05:23:15 UTC
falcon git commit: FALCON-2259 Unregister an extension only if no
extension jobs are dependant on the extension
Repository: falcon
Updated Branches:
refs/heads/master 6e162747b -> 3c011688b
FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension
Author: sandeep <sa...@gmail.com>
Reviewers: @pallavi-rao
Closes #342 from sandeepSamudrala/FALCON-2259 and squashes the following commits:
d317d3d [sandeep] Fixed tests
6617993 [sandeep] Incorporated review comments
46ad617 [sandeep] Incorporated review comments
c1189be [sandeep] Incorporated review comments
a70f5a9 [sandeep] FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension
7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3c011688
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3c011688
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3c011688
Branch: refs/heads/master
Commit: 3c011688bf910ee47786afa6f6c582d9c8d356ef
Parents: 6e16274
Author: sandeep <sa...@gmail.com>
Authored: Thu Jan 19 10:53:06 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Jan 19 10:53:06 2017 +0530
----------------------------------------------------------------------
.../falcon/persistence/ExtensionJobsBean.java | 3 +-
.../persistence/PersistenceConstants.java | 7 ++--
.../extensions/jdbc/ExtensionMetaStore.java | 12 ++++++
.../falcon/extensions/store/ExtensionStore.java | 12 ++++++
.../resource/AbstractExtensionManager.java | 24 +++++++++---
.../apache/falcon/unit/FalconUnitClient.java | 6 ++-
.../falcon/unit/LocalExtensionManager.java | 2 +-
.../apache/falcon/unit/FalconUnitTestBase.java | 18 ++++-----
.../org/apache/falcon/unit/TestFalconUnit.java | 39 ++++++++++----------
9 files changed, 84 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
index 15a4dac..b6ac79d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
@@ -44,7 +44,8 @@ import java.util.List;
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
- @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
+ @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
+ @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionJobsBean {
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index e80f7b7..1e6a04b 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -60,9 +60,9 @@ public final class PersistenceConstants {
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
- public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
- public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
- public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
+ static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
+ static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
+ static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
@@ -80,5 +80,6 @@ public final class PersistenceConstants {
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
+ public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index df5d6c9..b47766c 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -137,6 +137,18 @@ public class ExtensionMetaStore {
}
}
+ public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
+ query.setParameter(EXTENSION_NAME, extensionName);
+ try {
+ return (List<ExtensionJobsBean>)query.getResultList();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
public void deleteExtension(String extensionName){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index c50d6de..c3b4feb 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -28,6 +28,7 @@ import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -382,6 +383,17 @@ public final class ExtensionStore {
return (storePath != null);
}
+ public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
+ List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
+ List<String> extensionJobNames = new ArrayList<>();
+ if (null != extensionJobs && !extensionJobs.isEmpty()) {
+ for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
+ extensionJobNames.add(extensionJobsBean.getJobName());
+ }
+ }
+ return extensionJobNames;
+ }
+
public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
FalconException {
validateStatusChange(extensionName, currentUser);
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 63974f2..8ada576 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.resource;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
@@ -99,13 +100,26 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
}
}
- public APIResult deleteExtensionMetadata(String extensionName){
+ public APIResult deleteExtensionMetadata(String extensionName) {
validateExtensionName(extensionName);
+ ExtensionStore metaStore = ExtensionStore.get();
try {
- return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName,
- CurrentUser.getUser()));
- } catch (Throwable e) {
- throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ canDeleteExtension(extensionName);
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ metaStore.deleteExtension(extensionName, CurrentUser.getUser()));
+ } catch (FalconException e) {
+ throw FalconWebException.newAPIException(e);
+ }
+ }
+
+ private void canDeleteExtension(String extensionName) throws FalconException {
+ ExtensionStore metaStore = ExtensionStore.get();
+ List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
+ if (!extensionJobs.isEmpty()) {
+ LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
+ ArrayUtils.toString(extensionJobs));
+ throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances"
+ + " are dependent on the extension" + ArrayUtils.toString(extensionJobs));
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index d76dbca..9358958 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -277,7 +277,11 @@ public class FalconUnitClient extends AbstractFalconClient {
@Override
public APIResult unregisterExtension(String extensionName) {
- return localExtensionManager.unRegisterExtension(extensionName);
+ try {
+ return localExtensionManager.unRegisterExtension(extensionName);
+ } catch (FalconException e) {
+ throw new FalconCLIException("Failed in unRegistering the extension"+ e.getMessage());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index a7303b1..ca39ddb 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -179,7 +179,7 @@ public class LocalExtensionManager extends AbstractExtensionManager {
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}
- APIResult unRegisterExtension(String extensionName) {
+ APIResult unRegisterExtension(String extensionName) throws FalconException {
return super.deleteExtensionMetadata(extensionName);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 0dd09c1..e9367d5 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -218,29 +218,29 @@ public class FalconUnitTestBase {
return props;
}
- public String registerExtension(String extensionName, String packagePath, String description)
+ APIResult registerExtension(String extensionName, String packagePath, String description)
throws IOException, FalconException {
- return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
+ return falconUnitClient.registerExtension(extensionName, packagePath, description);
}
- public String disableExtension(String extensionName) {
+ String disableExtension(String extensionName) {
return falconUnitClient.disableExtension(extensionName).getMessage();
}
- public String enableExtension(String extensionName) {
+ String enableExtension(String extensionName) {
return falconUnitClient.enableExtension(extensionName).getMessage();
}
- public String getExtensionJobDetails(String jobName) {
- return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
+ APIResult getExtensionJobDetails(String jobName) {
+ return falconUnitClient.getExtensionJobDetails(jobName);
}
- public String unregisterExtension(String extensionName) {
- return falconUnitClient.unregisterExtension(extensionName).getMessage();
+ APIResult unregisterExtension(String extensionName) {
+ return falconUnitClient.unregisterExtension(extensionName);
}
- public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
+ APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 29cfed4..72280f7 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -245,7 +245,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
assertStatus(result);
}
- public void setDummyProperty(Process process) {
+ private void setDummyProperty(Process process) {
Property property = new Property();
property.setName("dummy");
property.setValue("dummy");
@@ -424,13 +424,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
clearDB();
submitCluster();
createExtensionPackage();
-
- String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
+ APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
, "testExtension");
- Assert.assertEquals(result, "Extension :testExtension registered successfully.");
-
- result = unregisterExtension("testExtension");
- Assert.assertEquals(result, "Deleted extension:testExtension");
+ assertStatus(apiResult);
+ apiResult = unregisterExtension("testExtension");
+ assertStatus(apiResult);
+ Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension");
}
@Test
@@ -441,8 +440,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
createDir(PROCESS_APP_PATH);
fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
- String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
- Assert.assertEquals(result, "Extension :testExtension registered successfully.");
+ APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
+ assertStatus(apiResult);
disableExtension(TEST_EXTENSION);
createDir(PROCESS_APP_PATH);
@@ -457,10 +456,10 @@ public class TestFalconUnit extends FalconUnitTestBase {
}
enableExtension(TEST_EXTENSION);
- APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
+ apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
- result = getExtensionJobDetails(TEST_JOB);
- JSONObject resultJson = new JSONObject(result);
+ apiResult = getExtensionJobDetails(TEST_JOB);
+ JSONObject resultJson = new JSONObject(apiResult.getMessage());
Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testPipeline");
@@ -482,7 +481,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null);
assertStatus(apiResult);
- String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString();
+ String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString();
Assert.assertEquals(processes, "sample");
process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
Assert.assertEquals(process.getPipelines(), "testSample");
@@ -491,6 +490,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");
+ try {
+ unregisterExtension(TEST_EXTENSION);
+ Assert.fail("Should have thrown a FalconCLIException");
+ } catch (FalconWebException e) {
+ //Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted.
+ }
apiResult = deleteExtensionJob(TEST_JOB, null);
assertStatus(apiResult);
try {
@@ -506,14 +511,10 @@ public class TestFalconUnit extends FalconUnitTestBase {
Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob");
//Do nothing. Exception Expected.
}
+ apiResult = unregisterExtension(TEST_EXTENSION);
+ assertStatus(apiResult);
}
- @Test
- public void testExtensionJobSuspendAndResume() throws Exception {
-
- }
-
-
private void copyExtensionJar(String destDirPath) throws IOException {
File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
for (File file : dir.listFiles()) {