You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/02 11:25:04 UTC
falcon git commit: FALCON-2197 enable and disable functionalities for
extensions
Repository: falcon
Updated Branches:
refs/heads/master 9fdfe713f -> 8054727ce
FALCON-2197 enable and disable functionalities for extensions
Author: Pracheer Agarwal <pr...@inmobi.com>
Author: Pracheer Agarwal <pr...@gmail.com>
Reviewers: @sandeepSamudrala,@pallavi-rao
Closes #328 from PracheerAgarwal/enableDisable and squashes the following commits:
f141efc [Pracheer Agarwal] back merge
dd6f9e9 [Pracheer Agarwal] back merge
7f17079 [Pracheer Agarwal] back merge
26b182f [Pracheer Agarwal] EC_UNRELATED_TYPES error fixes
5e60c83 [Pracheer Agarwal] checkstyle errors
3100641 [Pracheer Agarwal] bug fixes
e9fabdf [Pracheer Agarwal] squashing the commits
f4a08c8 [Pracheer Agarwal] # This is a combination of 3 commits. # This is the 1st commit message: FALCON-2197, status flag added for extensions for enable/disable funcnality
0793f2e [Pracheer Agarwal] FALCON-2221 back merge
62271b3 [Pracheer Agarwal] FALCON-2221 bug fixes for extension job submit
ab830e4 [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs
c47613a [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs
2d9fc53 [Pracheer Agarwal] FALCON-2197, extension flag value changed
b39aea2 [Pracheer Agarwal] FALCON-2197, status flag added for extensions for enable/disable funcnality
f885602 [Pracheer Agarwal] FALCON-2197, setting the default status flag for extensions
8a0c245 [Pracheer Agarwal] FALCON-2197, enable and disable functionalities for extensions
46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8054727c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8054727c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8054727c
Branch: refs/heads/master
Commit: 8054727ce160bf7e9707cee86b6ea2b77897a26e
Parents: 9fdfe71
Author: Pracheer Agarwal <pr...@inmobi.com>
Authored: Mon Jan 2 16:54:48 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Jan 2 16:54:48 2017 +0530
----------------------------------------------------------------------
.../apache/falcon/cli/FalconExtensionCLI.java | 10 ++-
.../falcon/client/AbstractFalconClient.java | 14 ++++
.../org/apache/falcon/client/FalconClient.java | 16 +++-
.../falcon/extensions/ExtensionStatus.java | 37 +++++++++
.../falcon/persistence/ExtensionBean.java | 18 ++++-
.../persistence/PersistenceConstants.java | 1 +
.../extensions/jdbc/ExtensionMetaStore.java | 15 ++++
.../falcon/extensions/store/ExtensionStore.java | 27 +++++++
.../extensions/store/ExtensionStoreTest.java | 37 +++++++++
.../resource/AbstractExtensionManager.java | 20 ++++-
.../resource/proxy/ExtensionManagerProxy.java | 82 ++++++++++++++++----
.../apache/falcon/unit/FalconUnitClient.java | 10 +++
.../falcon/unit/LocalExtensionManager.java | 9 ++-
13 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index aa436da..0343aa8 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -52,6 +52,8 @@ public class FalconExtensionCLI {
public static final String UNREGISTER_OPT = "unregister";
public static final String DETAIL_OPT = "detail";
public static final String REGISTER_OPT = "register";
+ public static final String ENABLE_OPT = "enable";
+ public static final String DISABLE_OPT = "disable";
// Input parameters
public static final String EXTENSION_NAME_OPT = "extensionName";
@@ -153,10 +155,16 @@ public class FalconExtensionCLI {
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
+ } else if (optionsList.contains(ENABLE_OPT)) {
+ validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+ result = client.enableExtension(extensionName).getMessage();
+ } else if (optionsList.contains(DISABLE_OPT)) {
+ validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+ result = client.disableExtension(extensionName).getMessage();
} else {
throw new FalconCLIException("Invalid/missing extension command. Supported commands include "
+ "enumerate, definition, describe, list, instances, submit, submitAndSchedule, "
- + "schedule, suspend, resume, delete, update, validate. "
+ + "schedule, suspend, resume, delete, update, validate, enable, disable. "
+ "Please refer to Falcon CLI twiki for more details.");
}
OUT.get().println(result);
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index e4ce993..3181b64 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -197,6 +197,20 @@ public abstract class AbstractFalconClient {
public abstract APIResult unregisterExtension(String extensionName);
/**
+ *
+ * @param extensionName extensionName that needs to be enabled
+ * @return Result of the enableExtension operation
+ */
+ public abstract APIResult enableExtension(String extensionName);
+
+ /**
+ *
+ * @param extensionName extensionName that needs to be disabled
+ * @return Result of the disableExtension operation
+ */
+ public abstract APIResult disableExtension(String extensionName);
+
+ /**
* Prepares set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9adb142..8401c9c 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -356,7 +356,9 @@ public class FalconClient extends AbstractFalconClient {
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
- REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML);
+ REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
+ ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
+ DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);
private String path;
private String method;
@@ -1049,6 +1051,18 @@ public class FalconClient extends AbstractFalconClient {
return getResponse(APIResult.class, clientResponse);
}
+ public APIResult enableExtension(final String extensionName) {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.ENABLE.path, extensionName).call(ExtensionOperations.ENABLE);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
+ public APIResult disableExtension(final String extensionName) {
+ ClientResponse clientResponse = new ResourceBuilder()
+ .path(ExtensionOperations.DISABLE.path, extensionName).call(ExtensionOperations.DISABLE);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
public APIResult getExtensionDefinition(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DEFINITION.path, extensionName)
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
----------------------------------------------------------------------
diff --git a/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
new file mode 100644
index 0000000..8720096
--- /dev/null
+++ b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.extensions;
+
+/**
+ * Enum to store ExtensionStatus.
+ */
+public enum ExtensionStatus {
+ ENABLED("enabled state"),
+ DISABLED("disabled state");
+
+ private final String text;
+
+ ExtensionStatus(final String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString(){
+ return text;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
index 79cfe16..122ae5e 100644
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
@@ -18,6 +18,7 @@
package org.apache.falcon.persistence;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import javax.persistence.Basic;
@@ -44,7 +45,8 @@ import java.util.Date;
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "),
- @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName")
+ @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName"),
+ @NamedQuery(name = PersistenceConstants.CHANGE_EXTENSION_STATUS, query = "update ExtensionBean a set a.status = :extensionStatus where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionBean {
@@ -79,6 +81,12 @@ public class ExtensionBean {
@Column(name = "extension_owner")
private String extensionOwner;
+ @Basic
+ @NotNull
+ @Column(name = "status")
+ @Enumerated(EnumType.STRING)
+ private ExtensionStatus status;
+
public ExtensionType getExtensionType() {
return extensionType;
}
@@ -127,4 +135,12 @@ public class ExtensionBean {
this.extensionOwner = extensionOwner;
}
+ public ExtensionStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(ExtensionStatus status) {
+ this.status = status;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 26a5cd4..e80f7b7 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -75,6 +75,7 @@ public final class PersistenceConstants {
public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE";
public static final String DELETE_EXTENSION = "DELETE_EXTENSION";
public static final String GET_EXTENSION = "GET_EXTENSION";
+ public static final String CHANGE_EXTENSION_STATUS = "CHANGE_EXTENSION_STATUS";
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 03f98f6..e53069a 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.extensions.jdbc;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
@@ -36,6 +37,7 @@ public class ExtensionMetaStore {
private static final String EXTENSION_NAME = "extensionName";
private static final String JOB_NAME = "jobName";
private static final String EXTENSION_TYPE = "extensionType";
+ private static final String EXTENSION_STATUS = "extensionStatus";
private EntityManager getEntityManager() {
return FalconJPAService.get().getEntityManager();
@@ -50,6 +52,7 @@ public class ExtensionMetaStore {
extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
extensionBean.setDescription(description);
extensionBean.setExtensionOwner(extensionOwner);
+ extensionBean.setStatus(ExtensionStatus.ENABLED);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
@@ -229,4 +232,16 @@ public class ExtensionMetaStore {
entityManager.close();
}
}
+
+ public void updateExtensionStatus(String extensionName, ExtensionStatus status) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.CHANGE_EXTENSION_STATUS);
+ q.setParameter(EXTENSION_NAME, extensionName).setParameter(EXTENSION_STATUS, status);
+ try {
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index 13ff2d1..32b0cfd 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -378,4 +379,30 @@ public final class ExtensionStore {
return (storePath != null);
}
+ public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
+ FalconException {
+ validateStatusChange(extensionName, currentUser);
+ if (metaStore.getDetail(extensionName).getStatus().equals(status)) {
+ throw new ValidationException(extensionName + " is already in " + status.toString() + " state.");
+ } else {
+ metaStore.updateExtensionStatus(extensionName, status);
+ return "Status of extension: " + extensionName + "changed to " + status.toString() + " state.";
+ }
+ }
+
+ private void validateStatusChange(final String extensionName, String currentUser) throws FalconException {
+
+ ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED
+ : ExtensionType.CUSTOM;
+ if (extensionType.equals(ExtensionType.TRUSTED)) {
+ throw new ValidationException(extensionName + " is trusted. Status can't be changed for trusted "
+ + "extensions.");
+ } else if (!metaStore.checkIfExtensionExists(extensionName)) {
+ throw new FalconException("Extension:" + extensionName + " is not registered with Falcon.");
+ } else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) {
+ throw new FalconException("User: " + currentUser + " is not allowed to change status of extension: "
+ + extensionName);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 1b33e1b..b2fac5f 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.StoreAccessException;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
import org.apache.falcon.hadoop.JailedFileSystem;
@@ -157,6 +158,42 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
store.deleteExtension("ACLFailure", "falconUser");
}
+ @Test(expectedExceptions = FalconException.class)
+ public void testStatusChangeExtensionACLFailure() throws IOException, URISyntaxException, FalconException {
+ String extensionPath = EXTENSION_PATH + "testStatusChangeACLFailure";
+ createLibs(extensionPath);
+ createReadmeAndJar(extensionPath);
+ createMETA(extensionPath);
+ store = ExtensionStore.get();
+ store.registerExtension("testStatusChangeACLFailure", STORAGE_URL + extensionPath, "test desc", "falconUser");
+ store.updateExtensionStatus("testStatusChangeACLFailure", "oozieUser", ExtensionStatus.DISABLED);
+ }
+
+ @Test(expectedExceptions = ValidationException.class)
+ public void testStatusChangeExtensionValidationFailure() throws IOException, URISyntaxException, FalconException {
+ String extensionPath = EXTENSION_PATH + "testStatusChangeValidationFailure";
+ createLibs(extensionPath);
+ createReadmeAndJar(extensionPath);
+ createMETA(extensionPath);
+ store = ExtensionStore.get();
+ store.registerExtension("testStatusChangeValidationFailure", STORAGE_URL + extensionPath, "test desc",
+ "falconUser");
+ store.updateExtensionStatus("testStatusChangeValidationFailure", "falconUser", ExtensionStatus.ENABLED);
+ }
+
+ @Test()
+ public void testStatusChangeExtension() throws IOException, URISyntaxException, FalconException {
+ String extensionPath = EXTENSION_PATH + "testStatusChange";
+ createLibs(extensionPath);
+ createReadmeAndJar(extensionPath);
+ createMETA(extensionPath);
+ store = ExtensionStore.get();
+ store.registerExtension("testStatusChange", STORAGE_URL + extensionPath, "test desc", "falconUser");
+ store.updateExtensionStatus("testStatusChange", "falconUser", ExtensionStatus.DISABLED);
+ ExtensionMetaStore metaStore = new ExtensionMetaStore();
+ Assert.assertEquals(metaStore.getDetail("testStatusChange").getStatus(), ExtensionStatus.DISABLED);
+ }
+
private void createMETA(String extensionPath) throws IOException {
Path path = new Path(extensionPath + "/META");
if (fs.exists(path)) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index f1ed6f5..63bf1b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
@@ -174,6 +174,24 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
return tags.substring(nameStart, nameEnd);
}
+ public String disableExtension(String extensionName, String currentUser) {
+ validateExtensionName(extensionName);
+ try {
+ return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
+ } catch (Throwable e) {
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ public String enableExtension(String extensionName, String currentUser) {
+ validateExtensionName(extensionName);
+ try {
+ return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
+ } catch (Throwable e) {
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 551dbbf..2b5cbe7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -30,9 +30,10 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.Extension;
-import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionService;
import org.apache.falcon.extensions.ExtensionType;
+import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
@@ -192,6 +193,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
public APIResult schedule(@PathParam("job-name") String jobName,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
+ checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
try {
List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
if (entities.isEmpty()) {
@@ -307,8 +309,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@FormDataParam("feeds") List<FormDataBodyPart> feedForms,
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
+ checkIfExtensionIsEnabled(extensionName);
+ checkIfExtensionJobExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
-
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
submitEntities(extensionName, jobName, entityMap, config, request);
@@ -376,6 +379,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@FormDataParam("feeds") List<FormDataBodyPart> feedForms,
@FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
+ checkIfExtensionIsEnabled(extensionName);
+ checkIfExtensionJobExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
@@ -445,14 +450,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ byte[] configBytes = null;
+ if (configStream != null) {
+ configBytes = IOUtils.toByteArray(configStream);
+ }
for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
for (final Entity entity : entry.getValue()) {
- final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
- final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
- entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
- if (!embeddedMode) {
- super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
- }
if (entity.getEntityType().equals(EntityType.FEED)) {
feedNames.add(entity.getName());
} else {
@@ -460,13 +464,18 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
}
}
}
+ metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
- ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- byte[] configBytes = null;
- if (configStream != null) {
- configBytes = IOUtils.toByteArray(configStream);
+ for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+ for(final Entity entity : entry.getValue()){
+ final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
+ final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
+ entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
+ if (!embeddedMode) {
+ super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
+ }
+ }
}
- metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
}
private void updateEntities(String extensionName, String jobName,
@@ -700,6 +709,36 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
}
}
+ @POST
+ @Path("disable/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces(MediaType.TEXT_PLAIN)
+ public APIResult disableExtension(
+ @PathParam("extension-name") String extensionName) {
+ checkIfExtensionServiceIsEnabled();
+ try {
+ return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName,
+ CurrentUser.getUser()));
+ } catch (Throwable e) {
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @POST
+ @Path("enable/{extension-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces(MediaType.TEXT_PLAIN)
+ public APIResult enableExtension(
+ @PathParam("extension-name") String extensionName) {
+ checkIfExtensionServiceIsEnabled();
+ try {
+ return new APIResult(APIResult.Status.SUCCEEDED, super.enableExtension(extensionName,
+ CurrentUser.getUser()));
+ } catch (Throwable e) {
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
private List<Entity> generateEntities(String extensionName, InputStream configStream)
throws FalconException, IOException {
// get entities for extension job
@@ -731,4 +770,21 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND);
}
}
+
+ private static void checkIfExtensionIsEnabled(String extensionName) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+ throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static void checkIfExtensionJobExists(String jobName, String extensionName) {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
+ throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 9ed2a0d..2a40611 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -282,6 +282,16 @@ public class FalconUnitClient extends AbstractFalconClient {
}
@Override
+ public APIResult enableExtension(String extensionName) {
+ return localExtensionManager.enableExtension(extensionName);
+ }
+
+ @Override
+ public APIResult disableExtension(String extensionName) {
+ return localExtensionManager.disableExtension(extensionName);
+ }
+
+ @Override
public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
InputStream configStream = getServletInputStream(configPath);
http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 0412ef2..1e9b15a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -140,6 +140,14 @@ public class LocalExtensionManager extends AbstractExtensionManager {
return super.getExtensionJobDetail(jobName);
}
+ public APIResult disableExtension(String extensionName) {
+ return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
+ }
+
+ public APIResult enableExtension(String extensionName) {
+ return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
+ }
+
public APIResult getExtensionDetails(String extensionName){
return super.getExtensionDetail(extensionName);
}
@@ -147,5 +155,4 @@ public class LocalExtensionManager extends AbstractExtensionManager {
public APIResult getExtensions(){
return super.getExtensions();
}
-
}