You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/10 05:48:37 UTC
falcon git commit: FALCON-2233 Changes in falcon unit to
enable/disable extensions
Repository: falcon
Updated Branches:
refs/heads/master ecf9aee3e -> 44da87023
FALCON-2233 Changes in falcon unit to enable/disable extensions
1. Changes in falcon unit to enable/disable extensions
2. Adding enable/disable api in server
3. check for entension and its job existence
4. Few indentation changes
Author: Pracheer Agarwal <pr...@gmail.com>
Author: Pracheer Agarwal <pr...@inmobi.com>
Author: Pracheer Agarwal <pr...@im2216-x0.corp.inmobi.com>
Reviewers: @sandeepSamudrala
Closes #337 from PracheerAgarwal/FALCON-2233 and squashes the following commits:
9f4f62d [Pracheer Agarwal] checkstyle errors checked
4686083 [Pracheer Agarwal] back merge
28eff42 [Pracheer Agarwal] review comments addressed
e708436 [Pracheer Agarwal] FALCON-2233, Changes in falcon unit to enable/disable extensions
946efc0 [Pracheer Agarwal] check for entension and its job existence
2ce8bf0 [Pracheer Agarwal] adding enable/disable api in server
066c8e2 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/44da8702
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/44da8702
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/44da8702
Branch: refs/heads/master
Commit: 44da87023dbbcc19aa3c1e4037dc0aff2cd51087
Parents: ecf9aee
Author: Pracheer Agarwal <pr...@gmail.com>
Authored: Tue Jan 10 11:18:28 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 10 11:18:28 2017 +0530
----------------------------------------------------------------------
.../resource/AbstractExtensionManager.java | 19 ++++++++++
.../resource/proxy/ExtensionManagerProxy.java | 39 +++++++++-----------
.../falcon/unit/LocalExtensionManager.java | 31 +++++++++-------
.../apache/falcon/unit/FalconUnitTestBase.java | 8 ++++
.../org/apache/falcon/unit/TestFalconUnit.java | 11 ++++++
.../falcon/resource/ExtensionManager.java | 24 +++++++++++-
6 files changed, 96 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index ff89682..f7bf8e8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -213,4 +213,23 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
}
return results;
}
+
+ protected static void checkIfExtensionIsEnabled(String extensionName) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+ LOG.error("Extension: " + extensionName + " is in disabled state.");
+ throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ protected static void checkIfExtensionJobNameExists(String jobName, String extensionName) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
+ LOG.error("Extension job with name: " + extensionName + " already exists.");
+ throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 6e0b02f..6c9087e 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -30,7 +30,6 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.Extension;
-import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionService;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.ExtensionProperties;
@@ -334,7 +333,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionIsEnabled(extensionName);
- checkIfExtensionJobExists(jobName, extensionName);
+ checkIfExtensionJobNameExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
@@ -381,12 +380,24 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
private ExtensionType getExtensionType(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
+ if (extensionDetails == null) {
+ // return failure if the extension job doesn't exist
+ LOG.error("Extension not found: " + extensionName);
+ throw FalconWebException.newAPIException("Extension not found:" + extensionName,
+ Response.Status.NOT_FOUND);
+ }
return extensionDetails.getExtensionType();
}
private String getExtensionName(String jobName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobDetails = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobDetails == null) {
+ // return failure if the extension job doesn't exist
+ LOG.error("Extension job not found: " + jobName);
+ throw FalconWebException.newAPIException("Extension Job not found:" + jobName,
+ Response.Status.NOT_FOUND);
+ }
return extensionJobDetails.getExtensionName();
}
@@ -405,7 +416,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionIsEnabled(extensionName);
- checkIfExtensionJobExists(jobName, extensionName);
+ checkIfExtensionJobNameExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
SortedMap<EntityType, List<String>> entityNameMap;
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
@@ -592,6 +603,10 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
String extensionName = getExtensionName(jobName);
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
+ if (entityMap.get(EntityType.FEED).isEmpty() && entityMap.get(EntityType.PROCESS).isEmpty()) {
+ // return failure if the extension job doesn't exist
+ return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
+ }
updateEntities(extensionName, jobName, entityMap, config, request);
} catch (FalconException | IOException | JAXBException e) {
LOG.error("Error while updating extension job: " + jobName, e);
@@ -625,7 +640,6 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully");
}
-
// Extension store related REST API's
@GET
@Path("enumerate")
@@ -785,22 +799,5 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
}
}
- private static void checkIfExtensionIsEnabled(String extensionName) {
- ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- if (!metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
- LOG.error("Extension: " + extensionName + " is in disabled state.");
- throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
- private static void checkIfExtensionJobExists(String jobName, String extensionName) {
- ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
- if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
- LOG.error("Extension job with name: " + extensionName + " already exists.");
- throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index a32dbfa..a7303b1 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -39,27 +39,30 @@ import java.util.SortedMap;
/**
* A proxy implementation of the extension operations in local mode.
*/
-class LocalExtensionManager extends AbstractExtensionManager {
- LocalExtensionManager() {}
+public class LocalExtensionManager extends AbstractExtensionManager {
+ LocalExtensionManager() {
+ }
APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
- SortedMap<EntityType, List<Entity>> entityMap)
- throws FalconException, IOException {
+ SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
+ checkIfExtensionIsEnabled(extensionName);
+ checkIfExtensionJobNameExists(jobName, extensionName);
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
storeExtension(extensionName, jobName, configStream, entityMap);
-
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully" + jobName);
}
APIResult submitAndSchedulableExtensionJob(String extensionName, String jobName, InputStream configStream,
SortedMap<EntityType, List<Entity>> entityMap)
throws FalconException, IOException {
- for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
- for(Entity entity : entry.getValue()){
+ checkIfExtensionIsEnabled(extensionName);
+ checkIfExtensionJobNameExists(jobName, extensionName);
+ for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
+ for (Entity entity : entry.getValue()) {
submitInternal(entity, "falconUser");
}
}
@@ -96,7 +99,8 @@ class LocalExtensionManager extends AbstractExtensionManager {
}
APIResult scheduleExtensionJob(String jobName, String coloExpr, String doAsUser)
- throws FalconException, IOException{
+ throws FalconException, IOException {
+ checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
@@ -122,8 +126,7 @@ class LocalExtensionManager extends AbstractExtensionManager {
}
APIResult updateExtensionJob(String extensionName, String jobName, InputStream configStream,
- SortedMap<EntityType, List<Entity>> entityMap)
- throws FalconException, IOException {
+ SortedMap<EntityType, List<Entity>> entityMap) throws FalconException, IOException {
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
@@ -180,7 +183,7 @@ class LocalExtensionManager extends AbstractExtensionManager {
return super.deleteExtensionMetadata(extensionName);
}
- APIResult getExtensionJobDetails(String jobName){
+ APIResult getExtensionJobDetails(String jobName) {
return super.getExtensionJobDetail(jobName);
}
@@ -189,14 +192,14 @@ class LocalExtensionManager extends AbstractExtensionManager {
}
APIResult enableExtension(String extensionName) {
- return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
+ return new APIResult(APIResult.Status.SUCCEEDED, super.enableExtension(extensionName, CurrentUser.getUser()));
}
- APIResult getExtensionDetails(String extensionName){
+ APIResult getExtensionDetails(String extensionName) {
return super.getExtensionDetail(extensionName);
}
- public APIResult getExtensions(){
+ public APIResult getExtensions() {
return super.getExtensions();
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 9e836e7..0dd09c1 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -224,6 +224,14 @@ public class FalconUnitTestBase {
return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
}
+ public String disableExtension(String extensionName) {
+ return falconUnitClient.disableExtension(extensionName).getMessage();
+ }
+
+ public String enableExtension(String extensionName) {
+ return falconUnitClient.enableExtension(extensionName).getMessage();
+ }
+
public String getExtensionJobDetails(String jobName) {
return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 5717fc2..29cfed4 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -444,8 +444,19 @@ public class TestFalconUnit extends FalconUnitTestBase {
String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");
+ disableExtension(TEST_EXTENSION);
createDir(PROCESS_APP_PATH);
copyExtensionJar(packageBuildLib);
+
+ try {
+ submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
+ Assert.fail("Should have thrown a FalconWebException");
+ } catch (FalconWebException e) {
+ Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Extension: "
+ + TEST_EXTENSION + " is in disabled state.");
+ }
+ enableExtension(TEST_EXTENSION);
+
APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
assertStatus(apiResult);
result = getExtensionJobDetails(TEST_JOB);
http://git-wip-us.apache.org/repos/asf/falcon/blob/44da8702/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
index ac05b0f..b1ab513 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -157,8 +157,30 @@ public class ExtensionManager extends AbstractExtensionManager {
@Produces({MediaType.APPLICATION_JSON})
public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
- LOG.error("Definition is not supported on Server.Please run your operation on Prism ");
+ LOG.error("Definition is not supported on Server. Please run your operation on Prism ");
throw FalconWebException.newAPIException("Definition is not supported on Server. Please run your operation "
+ "on Prism.");
}
+
+ @GET
+ @Path("enable/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces(MediaType.TEXT_PLAIN)
+ public APIResult enableExtension(
+ @PathParam("extension-name") String extensionName) {
+ LOG.error("Enable extension is not supported on Server. Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("Enable extension is not supported on Server. Please run your "
+ + "operation on Prism.");
+ }
+
+ @GET
+ @Path("disable/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces(MediaType.TEXT_PLAIN)
+ public APIResult disableExtension(
+ @PathParam("extension-name") String extensionName) {
+ LOG.error("Disable extension is not supported on Server. Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("Disable extension is not supported on Server. Please run your "
+ + "operation on Prism.");
+ }
}