You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/05/03 20:23:48 UTC
falcon git commit: FALCON-1623 Implement safemode in Falcon Server
Repository: falcon
Updated Branches:
refs/heads/master 6d4ff0bef -> 2d51db7a0
FALCON-1623 Implement safemode in Falcon Server
Author: bvellanki <bv...@hortonworks.com>
Reviewers: "yzheng-hortonworks <yz...@hortonworks.com>, Venkat Ranganathan <ve...@hortonworks.com>, Sowmya Ramesh <so...@apache.org>"
Closes #116 from bvellanki/FALCON-1623
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d51db7a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d51db7a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d51db7a
Branch: refs/heads/master
Commit: 2d51db7a02394ce6856b4c622283f5afd99398eb
Parents: 6d4ff0b
Author: bvellanki <bv...@hortonworks.com>
Authored: Tue May 3 11:23:43 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue May 3 11:23:43 2016 -0700
----------------------------------------------------------------------
.../org/apache/falcon/cli/FalconAdminCLI.java | 14 ++
.../org/apache/falcon/FalconCLIConstants.java | 1 +
.../org/apache/falcon/client/FalconClient.java | 67 ++++---
.../org/apache/falcon/entity/EntityUtil.java | 14 ++
.../apache/falcon/security/SecurityUtil.java | 1 +
.../apache/falcon/util/StartupProperties.java | 52 +++++-
.../workflow/engine/OozieWorkflowEngine.java | 19 ++
pom.xml | 2 +-
.../java/org/apache/falcon/FalconServer.java | 24 ++-
.../falcon/resource/AbstractEntityManager.java | 47 ++++-
.../resource/AbstractInstanceManager.java | 54 +++---
.../AbstractSchedulableEntityManager.java | 30 +--
.../falcon/resource/admin/AdminResource.java | 40 ++++
.../resource/admin/AdminResourceTest.java | 36 ++--
.../falcon/rerun/handler/LateRerunConsumer.java | 5 +-
src/bin/service-start.sh | 11 ++
.../apache/falcon/cli/FalconSafemodeCLIIT.java | 183 +++++++++++++++++++
17 files changed, 517 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
index 56cc5b9..84439b9 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
@@ -18,6 +18,7 @@
package org.apache.falcon.cli;
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@@ -55,6 +56,8 @@ public class FalconAdminCLI extends FalconCLI {
"show the thread stack dump");
Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true,
"doAs user");
+ Option safemode = new Option(FalconCLIConstants.SAFE_MODE_OPT, true,
+ "doAs user");
Option help = new Option("help", false, "show Falcon help");
Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false,
"Use debug mode to see debugging statements on stdout");
@@ -62,6 +65,7 @@ public class FalconAdminCLI extends FalconCLI {
group.addOption(version);
group.addOption(stack);
group.addOption(help);
+ group.addOption(safemode);
adminOptions.addOptionGroup(group);
adminOptions.addOption(doAs);
@@ -102,6 +106,16 @@ public class FalconAdminCLI extends FalconCLI {
} else if (optionsList.contains(FalconCLIConstants.VERSION_OPT)) {
result = client.getVersion(doAsUser);
OUT.get().println("Falcon server build version: " + result);
+ } else if (optionsList.contains(FalconCLIConstants.SAFE_MODE_OPT)) {
+ String safemode = commandLine.getOptionValue(FalconCLIConstants.SAFE_MODE_OPT);
+ ClientResponse response = client.setSafemode(safemode, doAsUser);
+ if (response.getStatus() == 200) {
+ OUT.get().println("Falcon server safemode set to : " + safemode);
+ } else {
+ ERR.get().println("Unable to set Falcon server to safemode value : " + safemode);
+ ERR.get().println(response.toString());
+ exitValue = -1;
+ }
} else if (optionsList.contains(FalconCLIConstants.HELP_CMD)) {
OUT.get().println("Falcon Help");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
index 436875d..1db5cfe 100644
--- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
+++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
@@ -35,6 +35,7 @@ public final class FalconCLIConstants {
public static final String ENTITY_CMD = "entity";
public static final String INSTANCE_CMD = "instance";
public static final String EXTENSION_CMD = "extension";
+ public static final String SAFE_MODE_OPT = "setsafemode";
public static final String TYPE_OPT = "type";
public static final String COLO_OPT = "colo";
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 36fb873..7a48973 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -18,30 +18,11 @@
package org.apache.falcon.client;
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
@@ -68,11 +49,28 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.security.SecureRandom;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
@@ -318,7 +316,8 @@ public class FalconClient extends AbstractFalconClient {
protected static enum AdminOperations {
STACK("api/admin/stack", HttpMethod.GET, MediaType.TEXT_PLAIN),
- VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ SAFEMODE("api/admin/setSafeMode", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
private String method;
@@ -706,6 +705,14 @@ public class FalconClient extends AbstractFalconClient {
return clientResponse.getStatus();
}
+ public ClientResponse setSafemode(String safemode, String doAsUser) throws FalconCLIException {
+ AdminOperations job = AdminOperations.SAFEMODE;
+ ClientResponse clientResponse = new ResourceBuilder().path(job.path).path(safemode)
+ .addQueryParam(DO_AS_OPT, doAsUser).call(job);
+ printClientResponse(clientResponse);
+ return clientResponse;
+ }
+
public String getDimensionList(String dimensionType, String cluster, String doAsUser) throws FalconCLIException {
return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 8825a65..b181ece 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -124,6 +124,20 @@ public final class EntityUtil {
}
}
+ /**
+ * List of entity operations.
+ */
+ public enum ENTITY_OPERATION {
+ SUBMIT,
+ UPDATE,
+ SCHEDULE,
+ SUBMIT_AND_SCHEDULE,
+ DELETE,
+ SUSPEND,
+ RESUME,
+ TOUCH
+ }
+
private EntityUtil() {}
public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
index fe04c40..7191f72 100644
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -141,4 +141,5 @@ public final class SecurityUtil {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/util/StartupProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
index 7522b0d..92ffa04 100644
--- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
@@ -19,7 +19,13 @@
package org.apache.falcon.util;
import org.apache.falcon.FalconException;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
@@ -28,7 +34,14 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public final class StartupProperties extends ApplicationProperties {
+ public static final String SAFEMODE_PROPERTY = "falcon.safeMode";
+ private static final String SAFEMODE_FILE = ".safemode";
+ private static final String CONFIGSTORE_PROPERTY = "config.store.uri";
+ private static FileSystem fileSystem;
+ private static Path storePath;
+
private static final String PROPERTY_FILE = "startup.properties";
+ private static final Logger LOG = LoggerFactory.getLogger(StartupProperties.class);
private static final AtomicReference<StartupProperties> INSTANCE =
new AtomicReference<StartupProperties>();
@@ -46,10 +59,47 @@ public final class StartupProperties extends ApplicationProperties {
try {
if (INSTANCE.get() == null) {
INSTANCE.compareAndSet(null, new StartupProperties());
+ storePath = new Path((INSTANCE.get().getProperty(CONFIGSTORE_PROPERTY)));
+ fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+ String isSafeMode = (doesSafemodeFileExist()) ? "true" : "false";
+ LOG.info("Initializing Falcon StartupProperties with safemode set to {}.", isSafeMode);
+ INSTANCE.get().setProperty(SAFEMODE_PROPERTY, isSafeMode);
}
return INSTANCE.get();
} catch (FalconException e) {
- throw new RuntimeException("Unable to read application " + "startup properties", e);
+ throw new RuntimeException("Unable to read application startup properties", e);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to verify Falcon safemode", e);
}
}
+
+ public static void createSafemodeFile() throws IOException {
+ Path safemodeFilePath = getSafemodeFilePath();
+ if (!doesSafemodeFileExist()) {
+ boolean success = fileSystem.createNewFile(safemodeFilePath);
+ if (!success) {
+ LOG.error("Failed to create safemode file at {}", safemodeFilePath.toUri());
+ throw new IOException("Failed to create safemode file at " + safemodeFilePath.toUri());
+ }
+ }
+ INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "true");
+ }
+
+ public static boolean deleteSafemodeFile() throws IOException {
+ INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "false");
+ return !doesSafemodeFileExist() || fileSystem.delete(getSafemodeFilePath(), true);
+ }
+
+ public static boolean doesSafemodeFileExist() throws IOException {
+ return fileSystem.exists(getSafemodeFilePath());
+ }
+
+ private static Path getSafemodeFilePath() {
+ return new Path(storePath, SAFEMODE_FILE);
+ }
+
+ public static boolean isServerInSafeMode() {
+ return Boolean.parseBoolean(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 05d5ef9..6b87b38 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -150,6 +150,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException {
+ if (StartupProperties.isServerInSafeMode()) {
+ throwSafemodeException("SCHEDULE");
+ }
Map<String, BundleJob> bundleMap = findLatestBundle(entity);
List<String> schedClusters = new ArrayList<String>();
for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
@@ -181,6 +184,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
+ private void throwSafemodeException(String operation) throws FalconException {
+ String error = "Workflow Engine does not allow " + operation + " opeartion when Falcon server is in safemode";
+ LOG.error(error);
+ throw new FalconException(error);
+ }
+
/**
* Prepare the staging and logs dir for this entity with default permissions.
*
@@ -204,6 +213,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
+ if (StartupProperties.isServerInSafeMode()) {
+ throwSafemodeException("DRYRUN");
+ }
OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
@@ -413,6 +425,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private String doBundleAction(Entity entity, BundleAction action) throws FalconException {
+ if (StartupProperties.isServerInSafeMode() && !action.equals(BundleAction.SUSPEND)) {
+ throwSafemodeException(action.name());
+ }
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
String result = null;
for (String cluster : clusters) {
@@ -637,6 +652,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ if (StartupProperties.isServerInSafeMode()
+ && (action.equals(JobAction.RERUN) || action.equals(JobAction.RESUME))) {
+ throwSafemodeException(action.name());
+ }
return doJobAction(action, entity, start, end, props, lifeCycles, null);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a72889e..54863b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <hadoop.version>2.7.1</hadoop.version>
+ <hadoop.version>2.6.2</hadoop.version>
</properties>
<dependencyManagement>
<dependencies>
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/FalconServer.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconServer.java b/prism/src/main/java/org/apache/falcon/FalconServer.java
index ea341b3..3d9879a 100644
--- a/prism/src/main/java/org/apache/falcon/FalconServer.java
+++ b/prism/src/main/java/org/apache/falcon/FalconServer.java
@@ -27,9 +27,9 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.util.BuildProperties;
import org.apache.falcon.util.EmbeddedServer;
+import org.apache.falcon.util.StartupProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.falcon.util.StartupProperties;
/**
* Driver for running Falcon as a standalone server with embedded jetty server.
@@ -38,6 +38,8 @@ public final class FalconServer {
private static final Logger LOG = LoggerFactory.getLogger(FalconServer.class);
private static final String APP_PATH = "app";
private static final String APP_PORT = "port";
+ private static final String SAFE_MODE = "setsafemode";
+
private static EmbeddedServer server;
private static BrokerService broker;
@@ -59,6 +61,10 @@ public final class FalconServer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option(SAFE_MODE, true, "Application mode, start safemode if true");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return new GnuParser().parse(options, args);
}
@@ -88,6 +94,16 @@ public final class FalconServer {
appPath = cmd.getOptionValue(APP_PATH);
}
+ if (cmd.hasOption(SAFE_MODE)) {
+ validateSafemode(cmd.getOptionValue(SAFE_MODE));
+ boolean isSafeMode = Boolean.parseBoolean(cmd.getOptionValue(SAFE_MODE));
+ if (isSafeMode) {
+ StartupProperties.createSafemodeFile();
+ } else {
+ StartupProperties.deleteSafemodeFile();
+ }
+ }
+
final String enableTLSFlag = StartupProperties.get().getProperty("falcon.enableTLS");
final int appPort = getApplicationPort(cmd, enableTLSFlag);
final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort);
@@ -102,6 +118,12 @@ public final class FalconServer {
server.start();
}
+ private static void validateSafemode(String isSafeMode) throws Exception {
+ if (!("true".equals(isSafeMode) || "false".equals(isSafeMode))) {
+ throw new Exception("Invalid value for argument safemode. Allowed values are \"true\" or \"false\"");
+ }
+ }
+
private static int getApplicationPort(CommandLine cmd, String enableTLSFlag) {
final int appPort;
if (cmd.hasOption(APP_PORT)) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c119f23..b319dd1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -46,6 +46,7 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
@@ -226,8 +227,9 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
Entity entity = deserializeEntity(inputStream, entityType);
validate(entity);
- //Validate that the entity can be scheduled in the cluster
- if (entity.getEntityType().isSchedulable()) {
+ // Validate that the entity can be scheduled in the cluster.
+ // Perform dryrun only if falcon is not in safemode.
+ if (entity.getEntityType().isSchedulable() && !StartupProperties.isServerInSafeMode()) {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
try {
@@ -266,6 +268,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
String removedFromEngine = "";
try {
Entity entityObj = EntityUtil.getEntity(type, entity);
+ verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.DELETE);
canRemove(entityObj);
obtainEntityLocks(entityObj, "delete", tokenList);
@@ -307,6 +310,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
try {
EntityType entityType = EntityType.getEnum(type);
Entity entity = deserializeEntity(inputStream, entityType);
+ verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.UPDATE);
return update(entity, type, entityName, skipDryRun);
} catch (IOException | FalconException e) {
LOG.error("Update failed", e);
@@ -435,10 +439,49 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
throws IOException, FalconException {
EntityType entityType = EntityType.getEnum(type);
Entity entity = deserializeEntity(inputStream, entityType);
+ verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.SUBMIT);
submitInternal(entity, doAsUser);
return entity;
}
+ protected void verifySafemodeOperation(Entity entity, EntityUtil.ENTITY_OPERATION operation) {
+ // if Falcon not in safemode, return
+ if (!StartupProperties.isServerInSafeMode()) {
+ return;
+ }
+
+ switch (operation) {
+ case UPDATE:
+ if (entity.getEntityType().equals(EntityType.CLUSTER)) {
+ return;
+ } else {
+ LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode",
+ operation.name());
+ throw FalconWebException.newAPIException("Entity operation " + operation.name()
+ + " is only allowed on cluster entities during safemode");
+ }
+ case SUSPEND:
+ if (entity.getEntityType().equals(EntityType.CLUSTER)) {
+ LOG.error("Entity operation {} is not allowed on cluster entity",
+ operation.name());
+ throw FalconWebException.newAPIException("Entity operation " + operation.name()
+ + " is not allowed on cluster entity");
+ } else {
+ return;
+ }
+ case SCHEDULE:
+ case SUBMIT_AND_SCHEDULE:
+ case DELETE:
+ case RESUME:
+ case TOUCH:
+ case SUBMIT:
+ default:
+ LOG.error("Entity operation {} is not allowed during safemode", operation.name());
+ throw FalconWebException.newAPIException("Entity operation "
+ + operation.name() + " not allowed during safemode");
+ }
+ }
+
protected synchronized void submitInternal(Entity entity, String doAsUser) throws IOException, FalconException {
EntityType entityType = entity.getEntityType();
List<Entity> tokenList = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index ba183c8..528ff98 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -18,25 +18,6 @@
package org.apache.falcon.resource;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-
import com.thinkaurelius.titan.core.TitanMultiVertexQuery;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
@@ -69,11 +50,29 @@ import org.apache.falcon.metadata.RelationshipType;
import org.apache.falcon.resource.InstancesResult.Instance;
import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+
/**
* A base class for managing Entity's Instance operations.
*/
@@ -616,7 +615,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult resumeInstance(HttpServletRequest request, String type, String entity, String startStr,
- String endStr, String colo, List<LifeCycle> lifeCycles) {
+ String endStr, String colo,
+ List<LifeCycle> lifeCycles) {
Properties props = getProperties(request);
return resumeInstance(props, type, entity, startStr, endStr, colo, lifeCycles);
}
@@ -625,6 +625,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
checkType(type);
+ if (StartupProperties.isServerInSafeMode()) {
+ throwSafemodeException("RESUME");
+ }
try {
lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
validateParams(type, entity);
@@ -912,6 +915,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
checkColo(colo);
checkType(type);
+ if (StartupProperties.isServerInSafeMode()) {
+ throwSafemodeException("RERUN");
+ }
try {
lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
validateParams(type, entity);
@@ -1066,4 +1072,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
return earliestDate;
}
+
+ private void throwSafemodeException(String operation) {
+ String error = "Instance operation " + operation + " cannot be performed when server is in safemode";
+ LOG.error(error);
+ throw FalconWebException.newAPIException(error);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 864381a..c6903a4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -18,19 +18,6 @@
package org.apache.falcon.resource;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
@@ -48,10 +35,21 @@ import org.apache.falcon.service.FeedSLAMonitoringService;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.security.authorize.AuthorizationException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* REST resource of allowed actions on Schedulable Entities, Only Process and
* Feed can have schedulable actions.
@@ -94,6 +92,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
Entity entityObj = null;
try {
entityObj = EntityUtil.getEntity(type, entity);
+ verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SCHEDULE);
//first acquire lock on entity before scheduling
if (!memoryLocks.acquireLock(entityObj, "schedule")) {
throw FalconWebException.newAPIException("Looks like an schedule/update command is already"
@@ -221,6 +220,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
try {
checkSchedulableEntity(type);
Entity entityObj = EntityUtil.getEntity(type, entity);
+ verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SUSPEND);
if (getWorkflowEngine(entityObj).isActive(entityObj)) {
getWorkflowEngine(entityObj).suspend(entityObj);
} else {
@@ -249,6 +249,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
try {
checkSchedulableEntity(type);
Entity entityObj = EntityUtil.getEntity(type, entity);
+ verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.RESUME);
if (getWorkflowEngine(entityObj).isActive(entityObj)) {
getWorkflowEngine(entityObj).resume(entityObj);
} else {
@@ -355,6 +356,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
StringBuilder result = new StringBuilder();
try {
Entity entity = EntityUtil.getEntity(type, entityName);
+ verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.TOUCH);
decorateEntityWithACL(entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
index a75c97c..081141b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
@@ -19,6 +19,7 @@
package org.apache.falcon.resource.admin;
import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconWebException;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.BuildProperties;
@@ -26,6 +27,8 @@ import org.apache.falcon.util.DeploymentProperties;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletResponse;
@@ -35,9 +38,11 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -47,6 +52,8 @@ import java.util.Properties;
*/
@Path("admin")
public class AdminResource {
+ public static final String SAFEMODE = "safemode";
+ private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class);
/**
* Get stack trace of the falcon server.
@@ -107,6 +114,11 @@ public class AdminResource {
property.value = StartupProperties.get().getProperty("falcon.authentication.type", "simple");
props.add(property);
+ property = new Property();
+ property.key = SAFEMODE;
+ property.value = StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false");
+ props.add(property);
+
version = new PropertyList();
version.properties = props;
}
@@ -115,6 +127,34 @@ public class AdminResource {
}
/**
+ * Set safemode for falcon server.
+ *
+ * @param mode Set safemode to true/false based on mode.
+ * @return Configuration information of the server.
+ */
+ @GET
+ @Path("setSafeMode/{mode}")
+ @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+ public String setSafeMode(@PathParam("mode") String mode) {
+ LOG.info("Setting falcon server safemode property to: {}", mode);
+ try {
+ if ("true".equalsIgnoreCase(mode)) {
+ StartupProperties.createSafemodeFile();
+ } else if ("false".equalsIgnoreCase(mode)) {
+ StartupProperties.deleteSafemodeFile();
+ } else {
+ LOG.error("Bad request, Invalid value for setsafemode : {}", mode);
+ throw FalconWebException.newAPIException("Invalid value \"" + mode + "\" provided for safemode.",
+ Response.Status.BAD_REQUEST);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to manage safemode file in Falcon Server {} ", e.getMessage());
+ throw FalconWebException.newAPIException(e.getMessage(), Response.Status.BAD_REQUEST);
+ }
+ return StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false");
+ }
+
+ /**
* Get configuration information of the falcon server.
* @param type config-type can be build, deploy, startup or runtime
* @return Configuration information of the server.
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
index ea093c7..7447adf 100644
--- a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
@@ -38,20 +38,34 @@ public class AdminResourceTest {
@Test
public void testAdminVersion() throws Exception {
+ checkProperty("authentication", "simple");
+ StartupProperties.get().setProperty("falcon.authentication.type", "kerberos");
+ checkProperty("authentication", "kerberos");
+ StartupProperties.get().setProperty("falcon.authentication.type", "simple");
+ }
+
+ @Test
+ public void testSetSafemode() throws Exception {
+ checkProperty(AdminResource.SAFEMODE, "false");
+
AdminResource resource = new AdminResource();
- AdminResource.PropertyList propertyList = resource.getVersion();
- for(AdminResource.Property property : propertyList.properties) {
- if (property.key.equalsIgnoreCase("authentication")) {
- Assert.assertEquals(property.value, "simple");
- }
- }
+ String safemode = resource.setSafeMode("true");
+ Assert.assertEquals(safemode, "true");
+ Assert.assertTrue(StartupProperties.doesSafemodeFileExist());
+ checkProperty(AdminResource.SAFEMODE, "true");
- StartupProperties.get().setProperty("falcon.authentication.type", "kerberos");
- resource = new AdminResource();
- propertyList = resource.getVersion();
+ safemode = resource.setSafeMode("false");
+ Assert.assertEquals(safemode, "false");
+ Assert.assertFalse(StartupProperties.doesSafemodeFileExist());
+ checkProperty(AdminResource.SAFEMODE, "false");
+ }
+
+ private void checkProperty(String propertyName, String expectedVal) {
+ AdminResource resource = new AdminResource();
+ AdminResource.PropertyList propertyList = resource.getVersion();
for(AdminResource.Property property : propertyList.properties) {
- if (property.key.equalsIgnoreCase("authentication")) {
- Assert.assertEquals(property.value, "kerberos");
+ if (property.key.equalsIgnoreCase(propertyName)) {
+ Assert.assertEquals(property.value, expectedVal);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 047fa0f..98db379 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -25,9 +25,10 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.workflow.LateDataHandler;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.LateDataHandler;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
@@ -58,7 +59,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
LaterunEvent message, String entityType, String entityName) {
try {
if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
- || jobStatus.equals("SUSPENDED")) {
+ || jobStatus.equals("SUSPENDED") || StartupProperties.isServerInSafeMode()) {
LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as "
+ "job status is {} for : {}", jobStatus, message.getWfId());
message.setMsgInsertTime(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/src/bin/service-start.sh
----------------------------------------------------------------------
diff --git a/src/bin/service-start.sh b/src/bin/service-start.sh
index 4766130..f11dce0 100755
--- a/src/bin/service-start.sh
+++ b/src/bin/service-start.sh
@@ -13,6 +13,17 @@
# limitations under the License. See accompanying LICENSE file.
#
+# validate args
+args=("$@")
+for ((i=0; i < $#; i++)) {
+ if [ "-setsafemode" == "${args[$i]}" ]; then
+ if [ "false" != "${args[$i+1]}" ] && [ "true" != "${args[$i+1]}" ]; then
+ echo "Invalid argument for option -safemode. Acceptable values are true or false."
+ exit 1
+ fi
+ fi
+}
+
# resolve links - $0 may be a softlink
PRG="${0}"
http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
new file mode 100644
index 0000000..f640a69
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FalconTestUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Test for Falcon CLI.
+ */
+@Test(groups = {"exhaustive"})
+public class FalconSafemodeCLIIT {
+ private InMemoryWriter stream = new InMemoryWriter(System.out);
+ private TestContext context = new TestContext();
+ private Map<String, String> overlay;
+ private static final String START_INSTANCE = "2012-04-20T00:00Z";
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare();
+ FalconCLI.OUT.set(stream);
+ initSafemode();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ clearSafemode();
+ TestContext.deleteEntitiesFromStore();
+
+ }
+
+ private void initSafemode() throws Exception {
+ overlay = context.getUniqueOverlay();
+ // Submit one cluster
+ String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+ context.setCluster(overlay.get("cluster"));
+ Assert.assertEquals(stream.buffer.toString().trim(),
+ "falcon/default/Submit successful (cluster) " + context.getClusterName());
+
+ // Submit and schedule one feed
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -file " + filePath), 0);
+
+ // Schedule the feed
+ Assert.assertEquals(executeWithURL("entity -status -type feed -name " + overlay.get("inputFeedName")), 0);
+
+ // Test the lookup command
+ Assert.assertEquals(executeWithURL("entity -lookup -type feed -path "
+ + "/falcon/test/input/2014/11/23/23"), 0);
+
+ // Set safemode
+ Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url "
+ + TestContext.BASE_URL).split("\\s")), 0);
+ }
+
+ private void clearSafemode() throws Exception {
+ Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
+ + TestContext.BASE_URL).split("\\s")), 0);
+ Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"),
+ "false");
+ }
+
+ public void testEntityCommandsNotAllowedInSafeMode() throws Exception {
+ String filePath;
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), -1);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), -1);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type process -doAs " + FalconTestUtil.TEST_USER_2
+ + " -file " + filePath), -1);
+
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type cluster -file " + filePath), -1);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -doAs " + FalconTestUtil.TEST_USER_2
+ + " -file " + filePath), -1);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file " + filePath), -1);
+
+ Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName")
+ + " -type process -file " + filePath), -1);
+
+ Assert.assertEquals(executeWithURL("entity -touch -name " + overlay.get("processName")
+ + " -type process"), -1);
+
+ Assert.assertEquals(executeWithURL("entity -schedule -type feed "
+ + " -name " + overlay.get("inputFeedName")), -1);
+
+ Assert.assertEquals(executeWithURL("entity -resume -type feed "
+ + " -name " + overlay.get("inputFeedName")), -1);
+
+ Assert.assertEquals(executeWithURL("entity -delete -type feed -name " + overlay.get("inputFeedName")), -1);
+
+
+ }
+
+ public void testEntityCommandsAllowedInSafeMode() throws Exception {
+ // Allow definition, summary, list, suspend operations
+ Assert.assertEquals(executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")), 0);
+
+ Assert.assertEquals(executeWithURL("entity -suspend -type feed "
+ + " -name " + overlay.get("inputFeedName")), 0);
+
+ Assert.assertEquals(executeWithURL("entity -summary -type feed -cluster "+ overlay.get("cluster")
+ + " -fields status,tags -start " + START_INSTANCE
+ + " -filterBy TYPE:FEED -orderBy name -sortOrder asc "
+ + " -offset 0 -numResults 1 -numInstances 5"), 0);
+
+ Assert.assertEquals(executeWithURL("instance -list -type feed "
+ + " -name " + overlay.get("inputFeedName") + " -start "
+ + SchemaHelper.getDateFormat().format(new Date())), 0);
+
+ Assert.assertEquals(executeWithURL("instance -kill -type feed -name "
+ + overlay.get("inputFeedName")
+ + " -start " + START_INSTANCE + " -end " + START_INSTANCE), 0);
+
+ }
+
+ private int executeWithURL(String command) throws Exception {
+ FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n");
+ return new FalconCLI()
+ .run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+ }
+
+ private static class InMemoryWriter extends PrintStream {
+
+ private StringBuffer buffer = new StringBuffer();
+
+ public InMemoryWriter(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void println(String x) {
+ clear();
+ buffer.append(x);
+ super.println(x);
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public String getBuffer() {
+ return buffer.toString();
+ }
+
+ public void clear() {
+ buffer.delete(0, buffer.length());
+ }
+ }
+}