You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/05/03 20:23:48 UTC

falcon git commit: FALCON-1623 Implement safemode in Falcon Server

Repository: falcon
Updated Branches:
  refs/heads/master 6d4ff0bef -> 2d51db7a0


FALCON-1623 Implement safemode in Falcon Server

Author: bvellanki <bv...@hortonworks.com>

Reviewers: "yzheng-hortonworks <yz...@hortonworks.com>, Venkat Ranganathan <ve...@hortonworks.com>, Sowmya Ramesh <so...@apache.org>"

Closes #116 from bvellanki/FALCON-1623


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d51db7a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d51db7a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d51db7a

Branch: refs/heads/master
Commit: 2d51db7a02394ce6856b4c622283f5afd99398eb
Parents: 6d4ff0b
Author: bvellanki <bv...@hortonworks.com>
Authored: Tue May 3 11:23:43 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue May 3 11:23:43 2016 -0700

----------------------------------------------------------------------
 .../org/apache/falcon/cli/FalconAdminCLI.java   |  14 ++
 .../org/apache/falcon/FalconCLIConstants.java   |   1 +
 .../org/apache/falcon/client/FalconClient.java  |  67 ++++---
 .../org/apache/falcon/entity/EntityUtil.java    |  14 ++
 .../apache/falcon/security/SecurityUtil.java    |   1 +
 .../apache/falcon/util/StartupProperties.java   |  52 +++++-
 .../workflow/engine/OozieWorkflowEngine.java    |  19 ++
 pom.xml                                         |   2 +-
 .../java/org/apache/falcon/FalconServer.java    |  24 ++-
 .../falcon/resource/AbstractEntityManager.java  |  47 ++++-
 .../resource/AbstractInstanceManager.java       |  54 +++---
 .../AbstractSchedulableEntityManager.java       |  30 +--
 .../falcon/resource/admin/AdminResource.java    |  40 ++++
 .../resource/admin/AdminResourceTest.java       |  36 ++--
 .../falcon/rerun/handler/LateRerunConsumer.java |   5 +-
 src/bin/service-start.sh                        |  11 ++
 .../apache/falcon/cli/FalconSafemodeCLIIT.java  | 183 +++++++++++++++++++
 17 files changed, 517 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
index 56cc5b9..84439b9 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.cli;
 
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
@@ -55,6 +56,8 @@ public class FalconAdminCLI extends FalconCLI {
                 "show the thread stack dump");
         Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true,
                 "doAs user");
+        Option safemode = new Option(FalconCLIConstants.SAFE_MODE_OPT, true,
+                "doAs user");
         Option help = new Option("help", false, "show Falcon help");
         Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false,
                 "Use debug mode to see debugging statements on stdout");
@@ -62,6 +65,7 @@ public class FalconAdminCLI extends FalconCLI {
         group.addOption(version);
         group.addOption(stack);
         group.addOption(help);
+        group.addOption(safemode);
 
         adminOptions.addOptionGroup(group);
         adminOptions.addOption(doAs);
@@ -102,6 +106,16 @@ public class FalconAdminCLI extends FalconCLI {
         } else if (optionsList.contains(FalconCLIConstants.VERSION_OPT)) {
             result = client.getVersion(doAsUser);
             OUT.get().println("Falcon server build version: " + result);
+        } else if (optionsList.contains(FalconCLIConstants.SAFE_MODE_OPT)) {
+            String safemode = commandLine.getOptionValue(FalconCLIConstants.SAFE_MODE_OPT);
+            ClientResponse response = client.setSafemode(safemode, doAsUser);
+            if (response.getStatus() == 200) {
+                OUT.get().println("Falcon server safemode set to : " + safemode);
+            } else {
+                ERR.get().println("Unable to set Falcon server to safemode value : " + safemode);
+                ERR.get().println(response.toString());
+                exitValue = -1;
+            }
         } else if (optionsList.contains(FalconCLIConstants.HELP_CMD)) {
             OUT.get().println("Falcon Help");
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
index 436875d..1db5cfe 100644
--- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
+++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
@@ -35,6 +35,7 @@ public final class FalconCLIConstants {
     public static final String ENTITY_CMD = "entity";
     public static final String INSTANCE_CMD = "instance";
     public static final String EXTENSION_CMD = "extension";
+    public static final String SAFE_MODE_OPT = "setsafemode";
 
     public static final String TYPE_OPT = "type";
     public static final String COLO_OPT = "colo";

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 36fb873..7a48973 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -18,30 +18,11 @@
 
 package org.apache.falcon.client;
 
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.net.URL;
-import java.security.SecureRandom;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
@@ -68,11 +49,28 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.security.SecureRandom;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs
@@ -318,7 +316,8 @@ public class FalconClient extends AbstractFalconClient {
     protected static enum AdminOperations {
 
         STACK("api/admin/stack", HttpMethod.GET, MediaType.TEXT_PLAIN),
-        VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON);
+        VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        SAFEMODE("api/admin/setSafeMode", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
         private String method;
@@ -706,6 +705,14 @@ public class FalconClient extends AbstractFalconClient {
         return clientResponse.getStatus();
     }
 
+    public ClientResponse setSafemode(String safemode, String doAsUser) throws FalconCLIException {
+        AdminOperations job =  AdminOperations.SAFEMODE;
+        ClientResponse clientResponse = new ResourceBuilder().path(job.path).path(safemode)
+                .addQueryParam(DO_AS_OPT, doAsUser).call(job);
+        printClientResponse(clientResponse);
+        return clientResponse;
+    }
+
     public String getDimensionList(String dimensionType, String cluster, String doAsUser) throws FalconCLIException {
         return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 8825a65..b181ece 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -124,6 +124,20 @@ public final class EntityUtil {
         }
     }
 
+    /**
+     *  List of entity operations.
+     */
+    public enum ENTITY_OPERATION {
+        SUBMIT,
+        UPDATE,
+        SCHEDULE,
+        SUBMIT_AND_SCHEDULE,
+        DELETE,
+        SUSPEND,
+        RESUME,
+        TOUCH
+    }
+
     private EntityUtil() {}
 
     public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
index fe04c40..7191f72 100644
--- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
+++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java
@@ -141,4 +141,5 @@ public final class SecurityUtil {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/util/StartupProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
index 7522b0d..92ffa04 100644
--- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
@@ -19,7 +19,13 @@
 package org.apache.falcon.util;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -28,7 +34,14 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public final class StartupProperties extends ApplicationProperties {
 
+    public static final String SAFEMODE_PROPERTY = "falcon.safeMode";
+    private static final String SAFEMODE_FILE = ".safemode";
+    private static final String CONFIGSTORE_PROPERTY = "config.store.uri";
+    private static FileSystem fileSystem;
+    private static Path storePath;
+
     private static final String PROPERTY_FILE = "startup.properties";
+    private static final Logger LOG = LoggerFactory.getLogger(StartupProperties.class);
 
     private static final AtomicReference<StartupProperties> INSTANCE =
             new AtomicReference<StartupProperties>();
@@ -46,10 +59,47 @@ public final class StartupProperties extends ApplicationProperties {
         try {
             if (INSTANCE.get() == null) {
                 INSTANCE.compareAndSet(null, new StartupProperties());
+                storePath = new Path((INSTANCE.get().getProperty(CONFIGSTORE_PROPERTY)));
+                fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+                String isSafeMode = (doesSafemodeFileExist()) ? "true" : "false";
+                LOG.info("Initializing Falcon StartupProperties with safemode set to {}.", isSafeMode);
+                INSTANCE.get().setProperty(SAFEMODE_PROPERTY, isSafeMode);
             }
             return INSTANCE.get();
         } catch (FalconException e) {
-            throw new RuntimeException("Unable to read application " + "startup properties", e);
+            throw new RuntimeException("Unable to read application startup properties", e);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to verify Falcon safemode", e);
         }
     }
+
+    public static void createSafemodeFile() throws IOException {
+        Path safemodeFilePath = getSafemodeFilePath();
+        if (!doesSafemodeFileExist()) {
+            boolean success = fileSystem.createNewFile(safemodeFilePath);
+            if (!success) {
+                LOG.error("Failed to create safemode file at {}", safemodeFilePath.toUri());
+                throw new IOException("Failed to create safemode file at " + safemodeFilePath.toUri());
+            }
+        }
+        INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "true");
+    }
+
+    public static boolean deleteSafemodeFile() throws IOException {
+        INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "false");
+        return !doesSafemodeFileExist() || fileSystem.delete(getSafemodeFilePath(), true);
+    }
+
+    public static boolean doesSafemodeFileExist() throws IOException {
+        return fileSystem.exists(getSafemodeFilePath());
+    }
+
+    private static Path getSafemodeFilePath() {
+        return new Path(storePath, SAFEMODE_FILE);
+    }
+
+    public static boolean isServerInSafeMode() {
+        return Boolean.parseBoolean(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 05d5ef9..6b87b38 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -150,6 +150,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException {
+        if (StartupProperties.isServerInSafeMode()) {
+            throwSafemodeException("SCHEDULE");
+        }
         Map<String, BundleJob> bundleMap = findLatestBundle(entity);
         List<String> schedClusters = new ArrayList<String>();
         for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
@@ -181,6 +184,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    private void throwSafemodeException(String operation) throws FalconException {
+        String error = "Workflow Engine does not allow " + operation + " opeartion when Falcon server is in safemode";
+        LOG.error(error);
+        throw new FalconException(error);
+    }
+
     /**
      * Prepare the staging and logs dir for this entity with default permissions.
      *
@@ -204,6 +213,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
+        if (StartupProperties.isServerInSafeMode()) {
+            throwSafemodeException("DRYRUN");
+        }
         OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
         Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
         Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
@@ -413,6 +425,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String doBundleAction(Entity entity, BundleAction action) throws FalconException {
+        if (StartupProperties.isServerInSafeMode() && !action.equals(BundleAction.SUSPEND)) {
+            throwSafemodeException(action.name());
+        }
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
         String result = null;
         for (String cluster : clusters) {
@@ -637,6 +652,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
                                         Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+        if (StartupProperties.isServerInSafeMode()
+                && (action.equals(JobAction.RERUN) || action.equals(JobAction.RESUME))) {
+            throwSafemodeException(action.name());
+        }
         return doJobAction(action, entity, start, end, props, lifeCycles, null);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a72889e..54863b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <properties>
-                <hadoop.version>2.7.1</hadoop.version>
+                <hadoop.version>2.6.2</hadoop.version>
             </properties>
             <dependencyManagement>
                 <dependencies>

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/FalconServer.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconServer.java b/prism/src/main/java/org/apache/falcon/FalconServer.java
index ea341b3..3d9879a 100644
--- a/prism/src/main/java/org/apache/falcon/FalconServer.java
+++ b/prism/src/main/java/org/apache/falcon/FalconServer.java
@@ -27,9 +27,9 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.util.BuildProperties;
 import org.apache.falcon.util.EmbeddedServer;
+import org.apache.falcon.util.StartupProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.falcon.util.StartupProperties;
 
 /**
  * Driver for running Falcon as a standalone server with embedded jetty server.
@@ -38,6 +38,8 @@ public final class FalconServer {
     private static final Logger LOG = LoggerFactory.getLogger(FalconServer.class);
     private static final String APP_PATH = "app";
     private static final String APP_PORT = "port";
+    private static final String SAFE_MODE = "setsafemode";
+
     private static EmbeddedServer server;
     private static BrokerService broker;
 
@@ -59,6 +61,10 @@ public final class FalconServer {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option(SAFE_MODE, true, "Application mode, start safemode if true");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -88,6 +94,16 @@ public final class FalconServer {
             appPath = cmd.getOptionValue(APP_PATH);
         }
 
+        if (cmd.hasOption(SAFE_MODE)) {
+            validateSafemode(cmd.getOptionValue(SAFE_MODE));
+            boolean isSafeMode = Boolean.parseBoolean(cmd.getOptionValue(SAFE_MODE));
+            if (isSafeMode) {
+                StartupProperties.createSafemodeFile();
+            } else {
+                StartupProperties.deleteSafemodeFile();
+            }
+        }
+
         final String enableTLSFlag = StartupProperties.get().getProperty("falcon.enableTLS");
         final int appPort = getApplicationPort(cmd, enableTLSFlag);
         final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort);
@@ -102,6 +118,12 @@ public final class FalconServer {
         server.start();
     }
 
+    private static void validateSafemode(String isSafeMode) throws Exception {
+        if (!("true".equals(isSafeMode) || "false".equals(isSafeMode))) {
+            throw new Exception("Invalid value for argument safemode. Allowed values are \"true\" or \"false\"");
+        }
+    }
+
     private static int getApplicationPort(CommandLine cmd, String enableTLSFlag) {
         final int appPort;
         if (cmd.hasOption(APP_PORT)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c119f23..b319dd1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -46,6 +46,7 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.io.IOUtils;
@@ -226,8 +227,9 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
             Entity entity = deserializeEntity(inputStream, entityType);
             validate(entity);
 
-            //Validate that the entity can be scheduled in the cluster
-            if (entity.getEntityType().isSchedulable()) {
+            // Validate that the entity can be scheduled in the cluster.
+            // Perform dryrun only if falcon is not in safemode.
+            if (entity.getEntityType().isSchedulable() && !StartupProperties.isServerInSafeMode()) {
                 Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
                 for (String cluster : clusters) {
                     try {
@@ -266,6 +268,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
             String removedFromEngine = "";
             try {
                 Entity entityObj = EntityUtil.getEntity(type, entity);
+                verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.DELETE);
 
                 canRemove(entityObj);
                 obtainEntityLocks(entityObj, "delete", tokenList);
@@ -307,6 +310,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
         try {
             EntityType entityType = EntityType.getEnum(type);
             Entity entity = deserializeEntity(inputStream, entityType);
+            verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.UPDATE);
             return update(entity, type, entityName, skipDryRun);
         } catch (IOException | FalconException e) {
             LOG.error("Update failed", e);
@@ -435,10 +439,49 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
         throws IOException, FalconException {
         EntityType entityType = EntityType.getEnum(type);
         Entity entity = deserializeEntity(inputStream, entityType);
+        verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.SUBMIT);
         submitInternal(entity, doAsUser);
         return entity;
     }
 
+    protected void verifySafemodeOperation(Entity entity, EntityUtil.ENTITY_OPERATION operation) {
+        // if Falcon not in safemode, return
+        if (!StartupProperties.isServerInSafeMode()) {
+            return;
+        }
+
+        switch (operation) {
+        case UPDATE:
+            if (entity.getEntityType().equals(EntityType.CLUSTER)) {
+                return;
+            } else {
+                LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode",
+                        operation.name());
+                throw FalconWebException.newAPIException("Entity operation " + operation.name()
+                        + " is only allowed on cluster entities during safemode");
+            }
+        case SUSPEND:
+            if (entity.getEntityType().equals(EntityType.CLUSTER)) {
+                LOG.error("Entity operation {} is not allowed on cluster entity",
+                        operation.name());
+                throw FalconWebException.newAPIException("Entity operation " + operation.name()
+                        + " is not allowed on cluster entity");
+            } else {
+                return;
+            }
+        case SCHEDULE:
+        case SUBMIT_AND_SCHEDULE:
+        case DELETE:
+        case RESUME:
+        case TOUCH:
+        case SUBMIT:
+        default:
+            LOG.error("Entity operation {} is not allowed during safemode", operation.name());
+            throw FalconWebException.newAPIException("Entity operation "
+                    + operation.name() + " not allowed during safemode");
+        }
+    }
+
     protected synchronized void submitInternal(Entity entity, String doAsUser) throws IOException, FalconException {
         EntityType entityType = entity.getEntityType();
         List<Entity> tokenList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index ba183c8..528ff98 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -18,25 +18,6 @@
 
 package org.apache.falcon.resource;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-
 import com.thinkaurelius.titan.core.TitanMultiVertexQuery;
 import com.thinkaurelius.titan.core.TitanVertex;
 import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
@@ -69,11 +50,29 @@ import org.apache.falcon.metadata.RelationshipType;
 import org.apache.falcon.resource.InstancesResult.Instance;
 import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
 import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+
 /**
  * A base class for managing Entity's Instance operations.
  */
@@ -616,7 +615,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
     }
 
     public InstancesResult resumeInstance(HttpServletRequest request, String type, String entity, String startStr,
-                                          String endStr, String colo, List<LifeCycle> lifeCycles) {
+                                          String endStr, String colo,
+                                          List<LifeCycle> lifeCycles) {
         Properties props = getProperties(request);
         return resumeInstance(props, type, entity, startStr, endStr, colo, lifeCycles);
     }
@@ -625,6 +625,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
                                           String colo, List<LifeCycle> lifeCycles) {
         checkColo(colo);
         checkType(type);
+        if (StartupProperties.isServerInSafeMode()) {
+            throwSafemodeException("RESUME");
+        }
         try {
             lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
             validateParams(type, entity);
@@ -912,6 +915,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
                                          String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
         checkColo(colo);
         checkType(type);
+        if (StartupProperties.isServerInSafeMode()) {
+            throwSafemodeException("RERUN");
+        }
         try {
             lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
             validateParams(type, entity);
@@ -1066,4 +1072,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
         return earliestDate;
     }
+
+    private void throwSafemodeException(String operation) {
+        String error = "Instance operation " + operation + " cannot be performed when server is in safemode";
+        LOG.error(error);
+        throw FalconWebException.newAPIException(error);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 864381a..c6903a4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -18,19 +18,6 @@
 
 package org.apache.falcon.resource;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
@@ -48,10 +35,21 @@ import org.apache.falcon.service.FeedSLAMonitoringService;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * REST resource of allowed actions on Schedulable Entities, Only Process and
  * Feed can have schedulable actions.
@@ -94,6 +92,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         Entity entityObj = null;
         try {
             entityObj = EntityUtil.getEntity(type, entity);
+            verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SCHEDULE);
             //first acquire lock on entity before scheduling
             if (!memoryLocks.acquireLock(entityObj, "schedule")) {
                 throw  FalconWebException.newAPIException("Looks like an schedule/update command is already"
@@ -221,6 +220,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         try {
             checkSchedulableEntity(type);
             Entity entityObj = EntityUtil.getEntity(type, entity);
+            verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SUSPEND);
             if (getWorkflowEngine(entityObj).isActive(entityObj)) {
                 getWorkflowEngine(entityObj).suspend(entityObj);
             } else {
@@ -249,6 +249,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         try {
             checkSchedulableEntity(type);
             Entity entityObj = EntityUtil.getEntity(type, entity);
+            verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.RESUME);
             if (getWorkflowEngine(entityObj).isActive(entityObj)) {
                 getWorkflowEngine(entityObj).resume(entityObj);
             } else {
@@ -355,6 +356,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         StringBuilder result = new StringBuilder();
         try {
             Entity entity = EntityUtil.getEntity(type, entityName);
+            verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.TOUCH);
             decorateEntityWithACL(entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             for (String cluster : clusters) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
index a75c97c..081141b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.resource.admin;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconWebException;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.BuildProperties;
@@ -26,6 +27,8 @@ import org.apache.falcon.util.DeploymentProperties;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletResponse;
@@ -35,9 +38,11 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -47,6 +52,8 @@ import java.util.Properties;
  */
 @Path("admin")
 public class AdminResource {
+    public static final String SAFEMODE = "safemode";
+    private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class);
 
     /**
      * Get stack trace of the falcon server.
@@ -107,6 +114,11 @@ public class AdminResource {
             property.value = StartupProperties.get().getProperty("falcon.authentication.type", "simple");
             props.add(property);
 
+            property = new Property();
+            property.key = SAFEMODE;
+            property.value = StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false");
+            props.add(property);
+
             version = new PropertyList();
             version.properties = props;
         }
@@ -115,6 +127,34 @@ public class AdminResource {
     }
 
     /**
+     * Set safemode for falcon server.
+     *
+     * @param mode Set safemode to true/false based on mode.
+     * @return Configuration information of the server.
+     */
+    @GET
+    @Path("setSafeMode/{mode}")
+    @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+    public String setSafeMode(@PathParam("mode") String mode) {
+        LOG.info("Setting falcon server safemode property to: {}", mode);
+        try {
+            if ("true".equalsIgnoreCase(mode)) {
+                StartupProperties.createSafemodeFile();
+            } else if ("false".equalsIgnoreCase(mode)) {
+                StartupProperties.deleteSafemodeFile();
+            } else {
+                LOG.error("Bad request, Invalid value for setsafemode : {}", mode);
+                throw FalconWebException.newAPIException("Invalid value \"" + mode + "\" provided for safemode.",
+                        Response.Status.BAD_REQUEST);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to manage safemode file in Falcon Server {} ", e.getMessage());
+            throw FalconWebException.newAPIException(e.getMessage(), Response.Status.BAD_REQUEST);
+        }
+        return StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false");
+    }
+
+    /**
      * Get configuration information of the falcon server.
      * @param type config-type can be build, deploy, startup or runtime
      * @return Configuration information of the server.

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
index ea093c7..7447adf 100644
--- a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java
@@ -38,20 +38,34 @@ public class AdminResourceTest {
 
     @Test
     public void testAdminVersion() throws Exception {
+        checkProperty("authentication", "simple");
+        StartupProperties.get().setProperty("falcon.authentication.type", "kerberos");
+        checkProperty("authentication", "kerberos");
+        StartupProperties.get().setProperty("falcon.authentication.type", "simple");
+    }
+
+    @Test
+    public void testSetSafemode() throws Exception {
+        checkProperty(AdminResource.SAFEMODE, "false");
+
         AdminResource resource = new AdminResource();
-        AdminResource.PropertyList propertyList = resource.getVersion();
-        for(AdminResource.Property property : propertyList.properties) {
-            if (property.key.equalsIgnoreCase("authentication")) {
-                Assert.assertEquals(property.value, "simple");
-            }
-        }
+        String safemode = resource.setSafeMode("true");
+        Assert.assertEquals(safemode, "true");
+        Assert.assertTrue(StartupProperties.doesSafemodeFileExist());
+        checkProperty(AdminResource.SAFEMODE, "true");
 
-        StartupProperties.get().setProperty("falcon.authentication.type", "kerberos");
-        resource = new AdminResource();
-        propertyList = resource.getVersion();
+        safemode = resource.setSafeMode("false");
+        Assert.assertEquals(safemode, "false");
+        Assert.assertFalse(StartupProperties.doesSafemodeFileExist());
+        checkProperty(AdminResource.SAFEMODE, "false");
+    }
+
+    private void checkProperty(String propertyName, String expectedVal) {
+        AdminResource resource = new AdminResource();
+        AdminResource.PropertyList propertyList = resource.getVersion();
         for(AdminResource.Property property : propertyList.properties) {
-            if (property.key.equalsIgnoreCase("authentication")) {
-                Assert.assertEquals(property.value, "kerberos");
+            if (property.key.equalsIgnoreCase(propertyName)) {
+                Assert.assertEquals(property.value, expectedVal);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 047fa0f..98db379 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -25,9 +25,10 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.workflow.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.LateDataHandler;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +59,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                                LaterunEvent message, String entityType, String entityName) {
         try {
             if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP")
-                    || jobStatus.equals("SUSPENDED")) {
+                    || jobStatus.equals("SUSPENDED") || StartupProperties.isServerInSafeMode()) {
                 LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as "
                         + "job status is {} for : {}", jobStatus, message.getWfId());
                 message.setMsgInsertTime(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/src/bin/service-start.sh
----------------------------------------------------------------------
diff --git a/src/bin/service-start.sh b/src/bin/service-start.sh
index 4766130..f11dce0 100755
--- a/src/bin/service-start.sh
+++ b/src/bin/service-start.sh
@@ -13,6 +13,17 @@
 #  limitations under the License. See accompanying LICENSE file.
 #
 
+# validate args
+args=("$@")
+for ((i=0; i < $#; i++)) {
+   if [ "-setsafemode" == "${args[$i]}" ]; then
+      if [ "false" != "${args[$i+1]}" ] && [ "true" != "${args[$i+1]}" ]; then
+        echo "Invalid argument for option -safemode. Acceptable values are true or false."
+        exit 1
+      fi
+   fi
+}
+
 # resolve links - $0 may be a softlink
 PRG="${0}"
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
new file mode 100644
index 0000000..f640a69
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.FalconTestUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Test for Falcon CLI.
+ */
+@Test(groups = {"exhaustive"})
+public class FalconSafemodeCLIIT {
+    private InMemoryWriter stream = new InMemoryWriter(System.out);
+    private TestContext context = new TestContext();
+    private Map<String, String> overlay;
+    private static final String START_INSTANCE = "2012-04-20T00:00Z";
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare();
+        FalconCLI.OUT.set(stream);
+        initSafemode();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        clearSafemode();
+        TestContext.deleteEntitiesFromStore();
+
+    }
+
+    private void initSafemode() throws Exception {
+        overlay = context.getUniqueOverlay();
+        // Submit one cluster
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+        context.setCluster(overlay.get("cluster"));
+        Assert.assertEquals(stream.buffer.toString().trim(),
+                "falcon/default/Submit successful (cluster) " + context.getClusterName());
+
+        // Submit and schedule one feed
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -file " + filePath), 0);
+
+        // Schedule the feed
+        Assert.assertEquals(executeWithURL("entity -status -type feed -name " + overlay.get("inputFeedName")), 0);
+
+        // Test the lookup command
+        Assert.assertEquals(executeWithURL("entity -lookup -type feed -path "
+                + "/falcon/test/input/2014/11/23/23"), 0);
+
+        // Set safemode
+        Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url "
+                + TestContext.BASE_URL).split("\\s")), 0);
+    }
+
+    private void clearSafemode() throws Exception {
+        Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
+                + TestContext.BASE_URL).split("\\s")), 0);
+        Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"),
+                "false");
+    }
+
+    public void testEntityCommandsNotAllowedInSafeMode() throws Exception {
+        String filePath;
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), -1);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), -1);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type process -doAs " + FalconTestUtil.TEST_USER_2
+                + " -file " + filePath), -1);
+
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+        Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type cluster -file " + filePath), -1);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -doAs " + FalconTestUtil.TEST_USER_2
+                + " -file " + filePath), -1);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file " + filePath), -1);
+
+        Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName")
+                + " -type process -file " + filePath), -1);
+
+        Assert.assertEquals(executeWithURL("entity -touch -name " + overlay.get("processName")
+                + " -type process"), -1);
+
+        Assert.assertEquals(executeWithURL("entity -schedule -type feed "
+                + " -name " + overlay.get("inputFeedName")), -1);
+
+        Assert.assertEquals(executeWithURL("entity -resume -type feed "
+                + " -name " + overlay.get("inputFeedName")), -1);
+
+        Assert.assertEquals(executeWithURL("entity -delete -type feed -name " + overlay.get("inputFeedName")), -1);
+
+
+    }
+
+    public void testEntityCommandsAllowedInSafeMode() throws Exception {
+        // Allow definition, summary, list, suspend operations
+        Assert.assertEquals(executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")), 0);
+
+        Assert.assertEquals(executeWithURL("entity -suspend -type feed "
+                + " -name " + overlay.get("inputFeedName")), 0);
+
+        Assert.assertEquals(executeWithURL("entity -summary -type feed -cluster "+ overlay.get("cluster")
+                + " -fields status,tags -start " + START_INSTANCE
+                + " -filterBy TYPE:FEED -orderBy name -sortOrder asc "
+                + " -offset 0 -numResults 1 -numInstances 5"), 0);
+
+        Assert.assertEquals(executeWithURL("instance -list -type feed "
+                + " -name " + overlay.get("inputFeedName") + " -start "
+                + SchemaHelper.getDateFormat().format(new Date())), 0);
+
+        Assert.assertEquals(executeWithURL("instance -kill -type feed -name "
+                + overlay.get("inputFeedName")
+                + " -start " + START_INSTANCE + " -end " + START_INSTANCE), 0);
+
+    }
+
+    private int executeWithURL(String command) throws Exception {
+        FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n");
+        return new FalconCLI()
+                .run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+    }
+
+    private static class InMemoryWriter extends PrintStream {
+
+        private StringBuffer buffer = new StringBuffer();
+
+        public InMemoryWriter(OutputStream out) {
+            super(out);
+        }
+
+        @Override
+        public void println(String x) {
+            clear();
+            buffer.append(x);
+            super.println(x);
+        }
+
+        @SuppressWarnings("UnusedDeclaration")
+        public String getBuffer() {
+            return buffer.toString();
+        }
+
+        public void clear() {
+            buffer.delete(0, buffer.length());
+        }
+    }
+}