You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2017/08/04 15:19:37 UTC

[fluo] branch master updated: Fixes #842 Support multiple ways for running Fluo applications (#883)

This is an automated email from the ASF dual-hosted git repository.

mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new cf608fe  Fixes #842 Support multiple ways for running Fluo applications (#883)
cf608fe is described below

commit cf608fe5da4ae0e81aac5e399181961287b2ceac
Author: Mike Walch <mw...@apache.org>
AuthorDate: Fri Aug 4 11:19:35 2017 -0400

    Fixes #842 Support multiple ways for running Fluo applications (#883)
    
    * Split fluo.properties into connection.properties and application.properties
    * Renamed several properties to support split
    * Support loading FluoConfiguration from multiple files
    * Created main methods commands: FluoOracle, FluoWorker,
      FluoGetJars, FluoInit, FluoList, FluoWait, FluoScan
    * Updated documentation to reflect changes
    * Moved shared scanning code to ScanUtil
    * Deprecated all classes in fluo-cluster module
    * Fluo only depends on Twill if using deprecated commands
    * Added 'fluo config' command
---
 docs/applications.md                               |  20 +-
 docs/grafana.md                                    |   2 +-
 docs/install.md                                    | 175 +++-------
 docs/metrics.md                                    |   8 +-
 .../java/org/apache/fluo/api/client/FluoAdmin.java |  35 +-
 .../org/apache/fluo/api/client/FluoFactory.java    |  26 +-
 .../apache/fluo/api/config/FluoConfiguration.java  | 381 +++++++++++++++++----
 .../fluo/api/config/SimpleConfiguration.java       |  58 ++--
 .../fluo/api/config/FluoConfigurationTest.java     | 116 ++++++-
 modules/api/src/test/resources/fluo-app.properties | 103 ++++++
 .../api/src/test/resources/fluo-conn.properties    |  30 ++
 .../src/test/resources}/fluo.properties            |  23 +-
 modules/cluster/pom.xml                            |   5 -
 .../apache/fluo/cluster/command/FluoCommand.java   |   1 +
 .../fluo/cluster/runnable/OracleRunnable.java      |   1 +
 .../fluo/cluster/runnable/WorkerRunnable.java      |   3 +-
 .../org/apache/fluo/cluster/runner/AppRunner.java  | 242 ++++---------
 .../fluo/cluster/runner/ClusterAppRunner.java      |   1 +
 .../apache/fluo/cluster/runner/ScanOptions.java    |  77 -----
 .../apache/fluo/cluster/runner/YarnAppRunner.java  |   1 +
 .../org/apache/fluo/cluster/util/ClusterUtil.java  |   1 +
 .../org/apache/fluo/cluster/util/FluoInstall.java  |   1 +
 .../apache/fluo/cluster/util/FluoYarnConfig.java   |   1 +
 .../org/apache/fluo/cluster/util/LogbackUtil.java  |   1 +
 .../apache/fluo/cluster/util/ValidateAppName.java  |   1 +
 .../org/apache/fluo/cluster/yarn/FluoTwillApp.java |   1 +
 .../org/apache/fluo/cluster/yarn/TwillUtil.java    |   1 +
 modules/cluster/src/test/resources/log4j.xml       |  42 ---
 modules/{cluster => command}/pom.xml               |  60 +---
 .../java/org/apache/fluo/command/CommandUtil.java} |  23 +-
 .../java/org/apache/fluo/command/FluoConfig.java   |  63 ++++
 .../java/org/apache/fluo/command/FluoExec.java     |  78 +++++
 .../java/org/apache/fluo/command/FluoGetJars.java  |  78 +++++
 .../java/org/apache/fluo/command/FluoGetProp.java  |  59 ++++
 .../java/org/apache/fluo/command/FluoInit.java}    |  77 +++--
 .../java/org/apache/fluo/command/FluoList.java     |  76 ++++
 .../java/org/apache/fluo/command/FluoOracle.java   |  58 ++++
 .../java/org/apache/fluo/command/FluoScan.java     | 139 ++++++++
 .../java/org/apache/fluo/command/FluoWait.java     | 116 +++++++
 .../java/org/apache/fluo/command/FluoWorker.java   |  58 ++++
 .../java/org/apache/fluo/command}/ScanTest.java    |  10 +-
 .../org/apache/fluo/core/client/FluoAdminImpl.java | 204 ++++++++++-
 .../apache/fluo/core/client/FluoClientImpl.java    |  22 +-
 .../org/apache/fluo/core/impl/Environment.java     |  16 +-
 .../fluo/core/impl/FluoConfigurationImpl.java      |   3 +
 .../fluo/core/observer/v1/ObserverStoreV1.java     |   2 +-
 .../apache/fluo/core/oracle/FluoOracleImpl.java    |  10 +-
 .../org/apache/fluo/core/oracle/OracleClient.java  |   2 +-
 .../java/org/apache/fluo/core/util/ScanUtil.java}  | 195 +++--------
 .../apache/fluo/core/worker/FluoWorkerImpl.java    |   8 +-
 modules/distribution/pom.xml                       |   5 +
 modules/distribution/src/main/assembly/bin.xml     |  17 +-
 .../src/main/config/fluo-app.properties            | 107 ++++++
 .../src/main/config/fluo-conn.properties           |  30 ++
 modules/distribution/src/main/config/fluo-env.sh   |  60 +++-
 ...{fluo.properties => fluo.properties.deprecated} |   5 +-
 .../distribution/src/main/config/log4j.properties  |  33 ++
 modules/distribution/src/main/config/log4j.xml     |  44 ---
 modules/distribution/src/main/lib/fetch.sh         |  40 ++-
 modules/distribution/src/main/scripts/fluo         | 284 +++++++++++----
 .../distribution/src/main/scripts/impl/config.sh   |  67 ----
 .../src/main/scripts/impl/fluo-version.sh          |  17 -
 modules/distribution/src/main/scripts/local-fluo   |  21 +-
 .../fluo/integration/client/FluoAdminImplIT.java   |  26 ++
 .../fluo/integration/client/FluoClientIT.java}     |  51 ++-
 .../java/org/apache/fluo/mini/MiniFluoImpl.java    |   2 +-
 pom.xml                                            |   6 +
 67 files changed, 2418 insertions(+), 1111 deletions(-)

diff --git a/docs/applications.md b/docs/applications.md
index 2cce6c0..c76f48d 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -45,21 +45,20 @@ allow the scripts to use the versions of Hadoop, Accumulo, and Zookeeper install
 To create a [FluoClient], you will need to provide it with a [FluoConfiguration] object that is
 configured to connect to your Fluo instance.
 
-If you have access to the [fluo.properties] file that was used to configure your Fluo instance, you
-can use it to build a [FluoConfiguration] object with all necessary properties which are all
-properties with the `fluo.client.*` prefix in [fluo.properties]:
+If you have access to the [fluo-conn.properties] file that was used to configure your Fluo instance, you
+can use it to build a [FluoConfiguration] object with all necessary properties:
 
 ```java
-FluoConfiguration config = new FluoConfiguration(new File("fluo.properties"));
+FluoConfiguration config = new FluoConfiguration(new File("fluo-conn.properties"));
+config.setApplicationName("myapp");
 ```
 
 You can also create an empty [FluoConfiguration] object and set properties using Java:
 
 ```java
 FluoConfiguration config = new FluoConfiguration();
-config.setAccumuloUser("user");
-config.setAccumuloPassword("pass");
-config.setAccumuloInstance("instance");
+config.setInstanceZookeepers("localhost/fluo");
+config.setApplicationName("myapp");
 ```
 
 Once you have [FluoConfiguration] object, pass it to the `newClient()` method of [FluoFactory] to
@@ -141,8 +140,8 @@ To create an observer, follow these steps:
 
 3.  Build a jar containing these classes and include this jar in the `lib/` directory of your Fluo
     application.
-4.  Configure your Fluo instance to use this observer provider by modifying the Observer section of
-    [fluo.properties].
+4.  Configure your Fluo instance to use this observer provider by modifying the Application section of
+    [fluo-app.properties].
 5.  Initialize Fluo.  During initialization Fluo will obtain the observed columns from the 
     ObserverProvider and persist the columns in Zookeeper.  These columns persisted in Zookeeper
     are used by transactions to know when to trigger observers.
@@ -216,7 +215,8 @@ where D is a hex digit. Also the `\` character is escaped to make the output una
 [FluoConfiguration]: ../modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
 [Observer]: ../modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
 [ObserverProvider]: ../modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
-[fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
+[fluo-conn.properties]: ../modules/distribution/src/main/config/fluo-conn.properties
+[fluo-app.properties]: ../modules/distribution/src/main/config/fluo-app.properties
 [API]: https://fluo.apache.org/apidocs/
 [metrics]: metrics.md
 [slf4j]: http://www.slf4j.org/
diff --git a/docs/grafana.md b/docs/grafana.md
index 54f6397..2b6d79c 100644
--- a/docs/grafana.md
+++ b/docs/grafana.md
@@ -62,7 +62,7 @@ Follow the instructions below to setup InfluxDB and Grafana.
     $INFLUXDB_HOME/bin/influx -import -path $FLUO_HOME/contrib/influxdb/fluo_metrics_setup.txt
     ```
 
-3. Configure `fluo.properties` in your Fluo app configuration to send Graphite metrics to InfluxDB.
+3. Configure the `fluo-app.properties` of your Fluo application to send Graphite metrics to InfluxDB.
    Below is example configuration. Remember to replace `<INFLUXDB_HOST>` with the actual host.
 
     ```
diff --git a/docs/install.md b/docs/install.md
index c905da8..7f7582b 100644
--- a/docs/install.md
+++ b/docs/install.md
@@ -17,7 +17,7 @@ limitations under the License.
 
 # Fluo Install Instructions
 
-Instructions for installing Apache Fluo and starting a Fluo application in YARN on a cluster where
+Instructions for installing Apache Fluo and starting a Fluo application on a cluster where
 Accumulo, Hadoop & Zookeeper are running.  If you need help setting up these dependencies, see the
 [related projects page][related] for external projects that may help.
 
@@ -50,24 +50,21 @@ After you obtain a Fluo distribution tarball, follow these steps to install Fluo
 1.  Choose a directory with plenty of space and untar the distribution:
 
         tar -xvzf fluo-1.1.0-incubating-bin.tar.gz
+        cd fluo-1.1.0-incubating
 
-2.  Copy the example configuration to the base of your configuration directory to create the default
-    configuration for your Fluo install:
+    The distribution contains a `fluo` script in `bin/` that administers Fluo and the
+    following configuration files in `conf/`:
 
-        cp conf/examples/* conf/
+    | Configuration file           | Description                                                                                  |
+    |------------------------------|----------------------------------------------------------------------------------------------|
+    | [fluo-env.sh]                | Configures classpath for `fluo` script. Required for all commands.                           |
+    | [fluo-conn.properties]       | Configures connection to Fluo. Required for all commands.                                    |
+    | [fluo-app.properties]        | Template for configuration file passed to `fluo init` when initializing Fluo application.    |
+    | [log4j.properties]           | Configures logging                                                                           |
+    | [fluo.properties.deprecated] | Deprecated Fluo configuration file. Replaced by fluo-conn.properties and fluo-app.properties |
 
-    The default configuration will be used as the base configuration for each new application.
-
-3.  Modify [fluo.properties] for your environment. However, you should not configure any
-    application settings (like observers).
-
-    NOTE - All properties that have a default are set with it. Uncomment a property if you want
-    to use a value different than the default. Properties that are unset and uncommented must be
-    set by the user.
-
-4. Fluo needs to build its classpath using jars from the versions of Hadoop, Accumulo, and
-Zookeeper that you are using. Choose one of the two ways below to make these jars available
-to Fluo:
+2.  Configure [fluo-env.sh] to set up your classpath using jars from the versions of Hadoop, Accumulo, and
+Zookeeper that you are using. Choose one of the two ways below to make these jars available to Fluo:
 
     * Set `HADOOP_PREFIX`, `ACCUMULO_HOME`, and `ZOOKEEPER_HOME` in your environment or configure
     these variables in [fluo-env.sh]. Fluo will look in these locations for jars.
@@ -78,12 +75,12 @@ to Fluo:
 
             ./lib/fetch.sh ahz -Daccumulo.version=1.7.2 -Dhadoop.version=2.7.2 -Dzookeeper.version=3.4.8
 
-5. Fluo needs more dependencies than what is available from Hadoop, Accumulo, and Zookeeper. These
+3. Fluo needs more dependencies than what is available from Hadoop, Accumulo, and Zookeeper. These
    extra dependencies need to be downloaded to `lib/` using the command below:
 
         ./lib/fetch.sh extra
 
-You are now ready to use the Fluo command script.
+You are now ready to use the `fluo` script.
 
 ## Fluo command script
 
@@ -104,101 +101,63 @@ Running the script without any arguments prints a description of all commands.
 
     ./bin/fluo
 
-## Configure a Fluo application
-
-You are now ready to configure a Fluo application. Use the command below to create the
-configuration necessary for a new application. Feel free to pick a different name (other than
-`myapp`) for your application:
-
-    fluo new myapp
-
-This command will create a directory for your application at `apps/myapp` of your Fluo install which
-will contain a `conf` and `lib`.
-
-The `apps/myapp/conf` directory contains a copy of the `fluo.properties` from your default
-configuration. This should be configured for your application:
-
-    vim apps/myapp/fluo.properties
-
-When configuring the observer section in fluo.properties, you can configure your instance for the
-[phrasecount] application if you have not created your own application. See the [phrasecount]
-example for instructions. You can also choose not to configure any observers but your workers will
-be idle when started.
-
-The `apps/myapp/lib` directory should contain any observer jars for your application. If you
-configured [fluo.properties] for observers, copy any jars containing these observer classes to this
-directory.
-
-## Initialize your application
-
-After your application has been configured, use the command below to initialize it:
+## Initialize Fluo application
 
-    fluo init myapp
+1. Create a copy of [fluo-app.properties] for your Fluo application. 
 
-This only needs to be called once and stores configuration in Zookeeper.
+        cp conf/fluo-app.properties /path/to/myapp/fluo-app.properties
 
-## Start your application
+2. Edit your copy of [fluo-app.properties] and make sure to set the following:
 
-A Fluo application consists of one oracle process and multiple worker processes. Before starting
-your application, you can configure the number of worker process in your [fluo.properties] file.
+    * Class name of your ObserverProvider
+    * Paths to your Fluo observer jars
+    * Accumulo configuration
+    * HDFS configuration
 
-When you are ready to start your Fluo application on your YARN cluster, run the command below:
+   When configuring the observer section of fluo-app.properties, you can configure your instance for the
+   [phrasecount] application if you have not created your own application. See the [phrasecount]
+   example for instructions. You can also choose not to configure any observers but your workers will
+   be idle when started.
 
-    fluo start myapp
+3. Run the command below to initialize your Fluo application. Change `myapp` to your application name:
 
-The start command above will work for a single-node or a large cluster. By using YARN, you do not
-need to deploy the Fluo binaries to every node on your cluster or start processes on every node.
+        fluo init myapp /path/to/myapp/fluo-app.properties
 
-You can use the following command to check the status of your instance:
+   A Fluo application only needs to be initialized once. After initialization, the Fluo application
+   name is used to start/stop the application and scan the Fluo table.
 
-    fluo status myapp
+4. Run `fluo list` which connects to Fluo and lists applications to verify initialization.
 
-For more detailed information on the YARN containers running Fluo:
+## Start your Fluo application
 
-    fluo info myapp
+Follow the instructions below to start a Fluo application which contains an oracle and multiple workers.
 
-You can also use `yarn application -list` to check the status of your Fluo instance in YARN. 
+1. Configure [fluo-env.sh] and [fluo-conn.properties] if you have not already.
 
-## View Fluo application logs
+2. Run Fluo application processes using the `fluo oracle` and `fluo worker` commands. Fluo applications
+   are typically run with one oracle process and multiple worker processes. The commands below will start
+   a Fluo oracle and two workers on your local machine:
 
-Fluo application logs are viewable within YARN using the methods below:
+        fluo oracle myapp
+        fluo worker myapp
+        fluo worker myapp
 
-1. View logs using the web interface of the the YARN resource manager
-(`http://<resource manager>:8088/cluster`). First, click on the application ID (i.e `application_*`)
-of your Fluo application and then click on the latest attempt ID (i.e `appattempt_*`). You should
-see a list of containers. There should be a container for the application master (typically
-container 1), a Fluo oracle (typically container 2), and Fluo workers (containers 3+). You can view
-the log files produced by a container by clicking on its 'logs' link. Logs from Fluo observers will
-be in the `worker_*.log` file for each of your worker containers. 
+   The commands will retrieve your application configuration and observer jars (using your
+   application name) before starting the oracle or worker process.
 
-2. View logs on disk in the directory specified by the YARN property `yarn.nodemanager.log-dirs` in
-your YARN configuration `yarn-site.xml` (see [yarn-default.xml] for more info about this property).
-If you are running Fluo on single machine, this method works well. If you are running Fluo on a
-cluster, your containers and their logs will be spread across the cluster.
+The oracle & worker logs can be found in the directory `logs/<applicationName>` of your Fluo installation.
 
-When running Fluo in YARN, Fluo must use [logback] for logging (due to a hard requirement by Twill).
-Logback is configured using `/path/to/fluo/apps/<app_name>/conf/logback.xml`. You should review this
-configuration but the root logger is configured by default to print any message that is debug level
-or higher.
+If you want to distribute the processes of your Fluo application across a cluster, you will need install
+Fluo on every node where you want to run a Fluo process and follow the instructions above on each node.
 
-In addition the `*.log` files, Fluo oracle and worker containers will have `stdout` and `stderr`
-files. These files may have helpful error messages. Especially, if a process failed to start
-or calls were made to `System.out` or `System.err` in your application.
+## Manage your Fluo application
 
-## View Fluo application data
+When you have data in your Fluo application, you can view it using the command `fluo scan myapp`. 
+Pipe the output to `less` using the command `fluo scan myapp | less` if you want to page through the data.
 
-When you have data in your Fluo application, you can view it using the command `fluo scan`. Pipe the
-output to `less` using the command `fluo scan | less` if you want to page through the data.
+To list all Fluo applications, run `fluo list`.
 
-## Stop your Fluo application
-
-Use the following command to stop your Fluo application:
-
-    fluo stop myapp
-
-If stop fails, there is also a kill command.
-
-    fluo kill myapp
+To stop your Fluo application, run `jps -m | grep Fluo` to find process IDs and use `kill` to stop them.
 
 ## Tuning Accumulo
 
@@ -213,31 +172,6 @@ probably increase the `tserver.server.threads.minimum` Accumulo setting.
 
 Using at least Accumulo 1.6.1 is recommended because multiple performance bugs were fixed.
 
-## Tuning YARN
-
-When running Fluo oracles and workers in YARN, the number of instances, max memory, and number of
-cores for Fluo processes can be configured in [fluo.properties]. If YARN is killing processes
-consider increasing `twill.java.reserved.memory.mb` (which defaults to 200 and is set in
-yarn-site.xml). The `twill.java.reserved.memory.mb` config determines the gap between the YARN
-memory limit set in [fluo.properties] and the java -Xmx setting. For example, if max memory is 1024
-and twill reserved memory is 200, the java -Xmx setting will be 1024-200 = 824 MB.
-
-## Run locally without YARN
-
-If you do not have YARN set up, you can start the oracle and worker as a local Fluo process using
-the following commands:
-
-    local-fluo start-oracle
-    local-fluo start-worker
-
-Use the following commands to stop a local Fluo process:
-
-    local-fluo stop-worker
-    local-fluo stop-oracle
-
-In a distributed environment, you will need to deploy and configure a Fluo distribution on every
-node in your cluster.
-
 [Accumulo]: https://accumulo.apache.org/
 [Hadoop]: http://hadoop.apache.org/
 [Zookeeper]: http://zookeeper.apache.org/
@@ -245,8 +179,9 @@ node in your cluster.
 [related]: https://fluo.apache.org/related-projects/
 [release]: https://fluo.apache.org/download/
 [phrasecount]: https://github.com/fluo-io/phrasecount
-[fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
+[fluo-conn.properties]: ../modules/distribution/src/main/config/fluo-conn.properties
+[fluo-app.properties]: ../modules/distribution/src/main/config/fluo-app.properties
+[log4j.properties]: ../modules/distribution/src/main/config/log4j.properties
+[fluo.properties.deprecated]: ../modules/distribution/src/main/config/fluo.properties.deprecated
 [fluo-env.sh]: ../modules/distribution/src/main/config/fluo-env.sh
 [lib/ahz/pom.xml]: ../modules/distribution/src/main/lib/ahz/pom.xml
-[yarn-default.xml]: https://hadoop.apache.org/docs/r2.7.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml
-[logback]: http://logback.qos.ch/
diff --git a/docs/metrics.md b/docs/metrics.md
index 4240394..8db93c7 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -17,16 +17,16 @@ limitations under the License.
 
 # Fluo Metrics
 
-A Fluo application can be configured (in [fluo.properties]) to report metrics. When metrics are
+A Fluo application can be configured (in [fluo-app.properties]) to report metrics. When metrics are
 configured, Fluo will report some 'default' metrics about an application that help users monitor its
 performance. Users can also write code to report 'application-specific' metrics from their
 applications. Both 'application-specific' and 'default' metrics share the same reporter configured
-by [fluo.properties] and are described in detail below.
+by [fluo-app.properties] and are described in detail below.
 
 ## Configuring reporters
 
 Fluo metrics are not published by default. To publish metrics, configure a reporter in the 'metrics'
-section of [fluo.properties]. There are several different reporter types (i.e Console, CSV,
+section of [fluo-app.properties]. There are several different reporter types (i.e Console, CSV,
 Graphite, JMX, SLF4J) that are implemented using [Dropwizard]. The choice of which reporter to use
 depends on the visualization tool used. If you are not currently using a visualization tool, there
 is [documentation][grafana] for reporting Fluo metrics to Grafana/InfluxDB.
@@ -116,7 +116,7 @@ the metric was updated and not a sum of the updates. For example if a request fo
 made to the oracle followed by a request for 3 timestamps, then the count for `oracle_server_stamps`
 would be 2 and the mean would be (5+3)/2.
 
-[fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
+[fluo-app.properties]: ../modules/distribution/src/main/config/fluo-app.properties
 [Dropwizard]: https://dropwizard.github.io/metrics/3.1.0/
 [grafana]: grafana.md
 [MetricsReporter]: ../modules/api/src/main/java/org/apache/fluo/api/metrics/MetricsReporter.java
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
index 70a5381..c0558f2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoAdmin.java
@@ -16,6 +16,7 @@
 package org.apache.fluo.api.client;
 
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
 
 /**
  * Provides methods for initializing and administering a Fluo application.
@@ -39,7 +40,7 @@ public interface FluoAdmin extends AutoCloseable {
 
     /**
      * Clears zookeeper root (if exists) specified by
-     * {@value FluoConfiguration#CLIENT_ZOOKEEPER_CONNECT_PROP}. Default is false.
+     * {@value FluoConfiguration#CONNECTION_ZOOKEEPERS_PROP}. Default is false.
      */
     public InitializationOptions setClearZookeeper(boolean clearZookeeper) {
       this.clearZookeeper = clearZookeeper;
@@ -51,8 +52,8 @@ public interface FluoAdmin extends AutoCloseable {
     }
 
     /**
-     * Clears accumulo table (if exists) specified by
-     * {@value FluoConfiguration#ADMIN_ACCUMULO_TABLE_PROP}. Default is false.
+     * Clears accumulo table (if exists) specified by {@value FluoConfiguration#ACCUMULO_TABLE_PROP}
+     * . Default is false.
      */
     public InitializationOptions setClearTable(boolean clearTable) {
       this.clearTable = clearTable;
@@ -107,9 +108,8 @@ public interface FluoAdmin extends AutoCloseable {
 
   /**
    * Initializes Fluo application and stores shared configuration in Zookeeper. Shared configuration
-   * consists of properties with {@value org.apache.fluo.api.config.FluoConfiguration#APP_PREFIX},
-   * {@value org.apache.fluo.api.config.FluoConfiguration#OBSERVER_PREFIX} and
-   * {@value org.apache.fluo.api.config.FluoConfiguration#TRANSACTION_PREFIX} prefixes. Throws
+   * consists of all properties except those with
+   * {@value org.apache.fluo.api.config.FluoConfiguration#CONNECTION_PREFIX} prefix. Throws
    * {@link AlreadyInitializedException} if Fluo application was already initialized in Zookeeper.
    * If you want to initialize Zookeeper again, set
    * {@link InitializationOptions#setClearZookeeper(boolean)} to true. Throws
@@ -120,12 +120,11 @@ public interface FluoAdmin extends AutoCloseable {
       TableExistsException;
 
   /**
-   * Updates shared configuration in Zookeeper. Shared configuration consists of properties with
-   * {@value org.apache.fluo.api.config.FluoConfiguration#APP_PREFIX},
-   * {@value org.apache.fluo.api.config.FluoConfiguration#OBSERVER_PREFIX} and
-   * {@value org.apache.fluo.api.config.FluoConfiguration#TRANSACTION_PREFIX} prefixes. This method
-   * is called if a user has previously called {@link #initialize(InitializationOptions)} but wants
-   * changes to shared configuration updated in Zookeeper.
+   * Updates shared configuration in Zookeeper. Shared configuration consists of all properties
+   * except those with {@value org.apache.fluo.api.config.FluoConfiguration#CONNECTION_PREFIX}
+   * prefix. This method is called if a user has previously called
+   * {@link #initialize(InitializationOptions)} but wants changes to shared configuration updated in
+   * Zookeeper.
    * 
    * <p>
    * During this method Observers are reinitialized using configuration passed to FluoAdmin and not
@@ -134,6 +133,18 @@ public interface FluoAdmin extends AutoCloseable {
    */
   void updateSharedConfig();
 
+  /**
+   * @return SimpleConfiguration containing connection-specific configuration passed to FluoFactory
+   * @since 1.2.0
+   */
+  SimpleConfiguration getConnectionConfig();
+
+  /**
+   * @return SimpleConfiguration containing application-specific configuration stored in Zookeeper
+   * @since 1.2.0
+   */
+  SimpleConfiguration getApplicationConfig();
+
   @Override
   void close();
 }
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
index 8c5daf9..52b3843 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
@@ -55,10 +55,9 @@ public class FluoFactory {
   /**
    * Creates a {@link FluoClient} for reading and writing data to Fluo. {@link FluoClient#close()}
    * should be called when you are finished using it. Configuration (see {@link FluoConfiguration})
-   * should contain properties with client.* prefix. Please review all client.* properties but many
-   * have a default. At a minimum, configuration should contain the following properties that have
-   * no default: fluo.client.accumulo.user, fluo.client.accumulo.password,
-   * fluo.client.accumulo.instance
+   * should contain properties with connection.* prefix. Please review all connection.* properties
+   * but many have a default. At a minimum, configuration should contain the following properties
+   * that have no default: fluo.connection.application.name
    */
   public static FluoClient newClient(SimpleConfiguration configuration) {
     return getAndBuildClassWithConfig(configuration, CLIENT_CLASS_PROP, CLIENT_CLASS_DEFAULT);
@@ -66,11 +65,10 @@ public class FluoFactory {
 
   /**
    * Creates a {@link FluoAdmin} client for administering Fluo. Configuration (see
-   * {@link FluoConfiguration}) should contain properties with client.* and admin.* prefix. Please
-   * review all properties but many have a default. At a minimum, configuration should contain the
-   * following properties that have no default: fluo.client.accumulo.user,
-   * fluo.client.accumulo.password, fluo.client.accumulo.instance, fluo.admin.accumulo.table,
-   * fluo.admin.accumulo.classpath
+   * {@link FluoConfiguration}) should contain all Fluo configuration properties. Review all
+   * properties but many have a default. At a minimum, configuration should contain the following
+   * properties that have no default: fluo.connection.application.name, fluo.accumulo.user,
+   * fluo.accumulo.password, fluo.accumulo.instance, fluo.accumulo.table, fluo.accumulo.classpath
    */
   public static FluoAdmin newAdmin(SimpleConfiguration configuration) {
     return getAndBuildClassWithConfig(configuration, ADMIN_CLASS_PROP, ADMIN_CLASS_DEFAULT);
@@ -88,14 +86,20 @@ public class FluoFactory {
   }
 
   /**
-   * Creates a {@link FluoOracle} using the provided configuration.
+   * Creates a {@link FluoOracle}. Configuration (see {@link FluoConfiguration}) should contain
+   * properties with connection.* prefix. Please review all connection.* properties but many have a
+   * default. At a minimum, configuration should contain the following properties that have no
+   * default: fluo.connection.application.name
    */
   public static FluoOracle newOracle(SimpleConfiguration configuration) {
     return getAndBuildClassWithConfig(configuration, ORACLE_CLASS_PROP, ORACLE_CLASS_DEFAULT);
   }
 
   /**
-   * Creates a {@link FluoWorker} using the provided configuration.
+   * Creates a {@link FluoWorker}. Configuration (see {@link FluoConfiguration}) should contain
+   * properties with connection.* prefix. Please review all connection.* properties but many have a
+   * default. At a minimum, configuration should contain the following properties that have no
+   * default: fluo.connection.application.name
    */
   public static FluoWorker newWorker(SimpleConfiguration configuration) {
     return getAndBuildClassWithConfig(configuration, WORKER_CLASS_PROP, WORKER_CLASS_DEFAULT);
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 782393b..54e07d2 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -47,39 +48,151 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   // Client properties
   private static final String CLIENT_PREFIX = FLUO_PREFIX + ".client";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.connection.application.name
+   */
+  @Deprecated
   public static final String CLIENT_APPLICATION_NAME_PROP = CLIENT_PREFIX + ".application.name";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.accumulo.password
+   */
+  @Deprecated
   public static final String CLIENT_ACCUMULO_PASSWORD_PROP = CLIENT_PREFIX + ".accumulo.password";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.accumulo.user
+   */
+  @Deprecated
   public static final String CLIENT_ACCUMULO_USER_PROP = CLIENT_PREFIX + ".accumulo.user";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.accumulo.instance
+   */
+  @Deprecated
   public static final String CLIENT_ACCUMULO_INSTANCE_PROP = CLIENT_PREFIX + ".accumulo.instance";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.accumulo.zookeepers
+   */
+  @Deprecated
   public static final String CLIENT_ACCUMULO_ZOOKEEPERS_PROP = CLIENT_PREFIX
       + ".accumulo.zookeepers";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.connection.zookeeper.timeout
+   */
+  @Deprecated
   public static final String CLIENT_ZOOKEEPER_TIMEOUT_PROP = CLIENT_PREFIX + ".zookeeper.timeout";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.connection.zookeepers
+   */
+  @Deprecated
   public static final String CLIENT_ZOOKEEPER_CONNECT_PROP = CLIENT_PREFIX + ".zookeeper.connect";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.connection.retry.timeout.ms
+   */
+  @Deprecated
   public static final String CLIENT_RETRY_TIMEOUT_MS_PROP = CLIENT_PREFIX + ".retry.timeout.ms";
+  @Deprecated
   public static final int CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT = 30000;
+  @Deprecated
   public static final String CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT = "localhost";
+  @Deprecated
   public static final String CLIENT_ZOOKEEPER_CONNECT_DEFAULT = "localhost/fluo";
+  @Deprecated
   public static final int CLIENT_RETRY_TIMEOUT_MS_DEFAULT = -1;
 
-  // Administration
+  // Connection properties
+  /**
+   * @since 1.2.0
+   */
+  public static final String CONNECTION_PREFIX = FLUO_PREFIX + ".connection";
+  /**
+   * @since 1.2.0
+   */
+  public static final String CONNECTION_APPLICATION_NAME_PROP = CONNECTION_PREFIX
+      + ".application.name";
+  /**
+   * @since 1.2.0
+   */
+  public static final String CONNECTION_ZOOKEEPER_TIMEOUT_PROP = CONNECTION_PREFIX
+      + ".zookeeper.timeout";
+  /**
+   * @since 1.2.0
+   */
+  public static final String CONNECTION_ZOOKEEPERS_PROP = CONNECTION_PREFIX + ".zookeepers";
+  /**
+   * @since 1.2.0
+   */
+  public static final String CONNECTION_RETRY_TIMEOUT_MS_PROP = CONNECTION_PREFIX
+      + ".retry.timeout.ms";
+  public static final int CONNECTION_ZOOKEEPER_TIMEOUT_DEFAULT = 30000;
+  public static final String CONNECTION_ZOOKEEPERS_DEFAULT = "localhost/fluo";
+  public static final int CONNECTION_RETRY_TIMEOUT_MS_DEFAULT = -1;
+
+  // Accumulo properties
+  private static final String ACCUMULO_PREFIX = FLUO_PREFIX + ".accumulo";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_INSTANCE_PROP = ACCUMULO_PREFIX + ".instance";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_TABLE_PROP = ACCUMULO_PREFIX + ".table";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_PASSWORD_PROP = ACCUMULO_PREFIX + ".password";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_USER_PROP = ACCUMULO_PREFIX + ".user";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_ZOOKEEPERS_PROP = ACCUMULO_PREFIX + ".zookeepers";
+  /**
+   * @since 1.2.0
+   */
+  public static final String ACCUMULO_JARS_PROP = ACCUMULO_PREFIX + ".jars";
+  // Accumulo defaults
+  public static final String ACCUMULO_ZOOKEEPERS_DEFAULT = "localhost";
+  public static final String ACCUMULO_JARS_DEFAULT = "";
+
+  // DFS properties
+  private static final String DFS_PREFIX = FLUO_PREFIX + ".dfs";
+  /**
+   * @since 1.2.0
+   */
+  public static final String DFS_ROOT_PROP = DFS_PREFIX + ".root";
+  // DFS defaults
+  public static final String DFS_ROOT_DEFAULT = "hdfs://localhost:8020/fluo";
+
+  // Administration properties
   private static final String ADMIN_PREFIX = FLUO_PREFIX + ".admin";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.accumulo.table
+   */
+  @Deprecated
   public static final String ADMIN_ACCUMULO_TABLE_PROP = ADMIN_PREFIX + ".accumulo.table";
+  /**
+   * @deprecated since 1.2.0 replaced by fluo.observer.init.dir and fluo.observer.jars.url
+   */
+  @Deprecated
   public static final String ADMIN_ACCUMULO_CLASSPATH_PROP = ADMIN_PREFIX + ".accumulo.classpath";
+  @Deprecated
   public static final String ADMIN_ACCUMULO_CLASSPATH_DEFAULT = "";
 
-  // Worker
+  // Worker properties
   private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
   public static final String WORKER_NUM_THREADS_PROP = WORKER_PREFIX + ".num.threads";
   public static final int WORKER_NUM_THREADS_DEFAULT = 10;
 
-  // Loader
+  // Loader properties
   private static final String LOADER_PREFIX = FLUO_PREFIX + ".loader";
   public static final String LOADER_NUM_THREADS_PROP = LOADER_PREFIX + ".num.threads";
   public static final String LOADER_QUEUE_SIZE_PROP = LOADER_PREFIX + ".queue.size";
   public static final int LOADER_NUM_THREADS_DEFAULT = 10;
   public static final int LOADER_QUEUE_SIZE_DEFAULT = 10;
 
-  // MiniFluo
+  // MiniFluo properties
   private static final String MINI_PREFIX = FLUO_PREFIX + ".mini";
   public static final String MINI_START_ACCUMULO_PROP = MINI_PREFIX + ".start.accumulo";
   public static final String MINI_DATA_DIR_PROP = MINI_PREFIX + ".data.dir";
@@ -87,19 +200,24 @@ public class FluoConfiguration extends SimpleConfiguration {
   public static final String MINI_DATA_DIR_DEFAULT = "${env:FLUO_HOME}/mini";
 
   /** The properties below get loaded into/from Zookeeper */
-  // Observer
-  @Deprecated
+  // Observer properties
   public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";
-
   /**
    * @since 1.1.0
    */
   public static final String OBSERVER_PROVIDER = FLUO_PREFIX + ".observer.provider";
-
   /**
-   * @since 1.1.0
+   * @since 1.2.0
+   */
+  public static final String OBSERVER_INIT_DIR_PROP = FLUO_PREFIX + ".observer.init.dir";
+  /**
+   * @since 1.2.0
    */
+  public static final String OBSERVER_JARS_URL_PROP = FLUO_PREFIX + ".observer.jars.url";
+  // Observer defaults
   public static final String OBSERVER_PROVIDER_DEFAULT = "";
+  public static final String OBSERVER_INIT_DIR_DEFAULT = "";
+  public static final String OBSERVER_JARS_URL_DEFAULT = "";
 
   // Transaction
   public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
@@ -109,7 +227,7 @@ public class FluoConfiguration extends SimpleConfiguration {
   // Metrics
   public static final String REPORTER_PREFIX = FLUO_PREFIX + ".metrics.reporter";
 
-  // application config
+  // Application config
   public static final String APP_PREFIX = FLUO_PREFIX + ".app";
 
   public FluoConfiguration() {
@@ -134,7 +252,6 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   public void validate() {
     // keep in alphabetical order
-    getAccumuloClasspath();
     getAccumuloInstance();
     getAccumuloPassword();
     getAccumuloTable();
@@ -142,7 +259,7 @@ public class FluoConfiguration extends SimpleConfiguration {
     getAccumuloZookeepers();
     getApplicationName();
     getAppZookeepers();
-    getClientRetryTimeout();
+    getConnectionRetryTimeout();
     getLoaderQueueSize();
     getLoaderThreads();
     getObserverSpecifications();
@@ -153,12 +270,19 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   public FluoConfiguration setApplicationName(String applicationName) {
     verifyApplicationName(applicationName);
-    setProperty(CLIENT_APPLICATION_NAME_PROP, applicationName);
+    setProperty(CONNECTION_APPLICATION_NAME_PROP, applicationName);
     return this;
   }
 
   public String getApplicationName() {
-    String applicationName = getString(CLIENT_APPLICATION_NAME_PROP);
+    String applicationName;
+    if (containsKey(CONNECTION_APPLICATION_NAME_PROP)) {
+      applicationName = getString(CONNECTION_APPLICATION_NAME_PROP);
+    } else if (containsKey(CLIENT_APPLICATION_NAME_PROP)) {
+      applicationName = getString(CLIENT_APPLICATION_NAME_PROP);
+    } else {
+      throw new NoSuchElementException(CONNECTION_APPLICATION_NAME_PROP + " was not set");
+    }
     verifyApplicationName(applicationName);
     return applicationName;
   }
@@ -200,11 +324,12 @@ public class FluoConfiguration extends SimpleConfiguration {
   }
 
   public FluoConfiguration setInstanceZookeepers(String zookeepers) {
-    return setNonEmptyString(CLIENT_ZOOKEEPER_CONNECT_PROP, zookeepers);
+    return setNonEmptyString(CONNECTION_ZOOKEEPERS_PROP, zookeepers);
   }
 
   public String getInstanceZookeepers() {
-    return getNonEmptyString(CLIENT_ZOOKEEPER_CONNECT_PROP, CLIENT_ZOOKEEPER_CONNECT_DEFAULT);
+    return getDepNonEmptyString(CONNECTION_ZOOKEEPERS_PROP, CLIENT_ZOOKEEPER_CONNECT_PROP,
+        CONNECTION_ZOOKEEPERS_DEFAULT);
   }
 
   public String getAppZookeepers() {
@@ -212,57 +337,85 @@ public class FluoConfiguration extends SimpleConfiguration {
   }
 
   public FluoConfiguration setZookeeperTimeout(int timeout) {
-    return setPositiveInt(CLIENT_ZOOKEEPER_TIMEOUT_PROP, timeout);
+    return setPositiveInt(CONNECTION_ZOOKEEPER_TIMEOUT_PROP, timeout);
   }
 
   public int getZookeeperTimeout() {
-    return getPositiveInt(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
+    return getDepPositiveInt(CONNECTION_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_PROP,
+        CONNECTION_ZOOKEEPER_TIMEOUT_DEFAULT);
   }
 
-  public FluoConfiguration setClientRetryTimeout(int timeoutMS) {
-    Preconditions.checkArgument(timeoutMS >= -1, CLIENT_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
-    setProperty(CLIENT_RETRY_TIMEOUT_MS_PROP, timeoutMS);
-    return this;
+  @Deprecated
+  public FluoConfiguration setClientRetryTimeout(int timeoutMs) {
+    return setConnectionRetryTimeout(timeoutMs);
   }
 
+  @Deprecated
   public int getClientRetryTimeout() {
-    int retval = getInt(CLIENT_RETRY_TIMEOUT_MS_PROP, CLIENT_RETRY_TIMEOUT_MS_DEFAULT);
-    Preconditions.checkArgument(retval >= -1, CLIENT_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
+    return getConnectionRetryTimeout();
+  }
+
+  /**
+   * @since 1.2.0
+   */
+  public FluoConfiguration setConnectionRetryTimeout(int timeoutMS) {
+    Preconditions.checkArgument(timeoutMS >= -1, CONNECTION_RETRY_TIMEOUT_MS_PROP
+        + " must be >= -1");
+    setProperty(CONNECTION_RETRY_TIMEOUT_MS_PROP, timeoutMS);
+    return this;
+  }
+
+  /**
+   * @since 1.2.0
+   */
+  public int getConnectionRetryTimeout() {
+    int retval;
+    if (containsKey(CONNECTION_RETRY_TIMEOUT_MS_PROP)) {
+      retval = getInt(CONNECTION_RETRY_TIMEOUT_MS_PROP, CONNECTION_RETRY_TIMEOUT_MS_DEFAULT);
+    } else {
+      retval = getInt(CLIENT_RETRY_TIMEOUT_MS_PROP, CONNECTION_RETRY_TIMEOUT_MS_DEFAULT);
+    }
+    Preconditions.checkArgument(retval >= -1, CONNECTION_RETRY_TIMEOUT_MS_PROP + " must be >= -1");
     return retval;
   }
 
   public FluoConfiguration setAccumuloInstance(String accumuloInstance) {
-    return setNonEmptyString(CLIENT_ACCUMULO_INSTANCE_PROP, accumuloInstance);
+    return setNonEmptyString(ACCUMULO_INSTANCE_PROP, accumuloInstance);
   }
 
   public String getAccumuloInstance() {
-    return getNonEmptyString(CLIENT_ACCUMULO_INSTANCE_PROP);
+    return getDepNonEmptyString(ACCUMULO_INSTANCE_PROP, CLIENT_ACCUMULO_INSTANCE_PROP);
   }
 
   public FluoConfiguration setAccumuloUser(String accumuloUser) {
-    return setNonEmptyString(CLIENT_ACCUMULO_USER_PROP, accumuloUser);
+    return setNonEmptyString(ACCUMULO_USER_PROP, accumuloUser);
   }
 
   public String getAccumuloUser() {
-    return getNonEmptyString(CLIENT_ACCUMULO_USER_PROP);
+    return getDepNonEmptyString(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP);
   }
 
   public FluoConfiguration setAccumuloPassword(String accumuloPassword) {
-    setProperty(CLIENT_ACCUMULO_PASSWORD_PROP,
-        verifyNotNull(CLIENT_ACCUMULO_PASSWORD_PROP, accumuloPassword));
+    setProperty(ACCUMULO_PASSWORD_PROP, verifyNotNull(ACCUMULO_PASSWORD_PROP, accumuloPassword));
     return this;
   }
 
   public String getAccumuloPassword() {
-    return verifyNotNull(CLIENT_ACCUMULO_PASSWORD_PROP, getString(CLIENT_ACCUMULO_PASSWORD_PROP));
+    if (containsKey(ACCUMULO_PASSWORD_PROP)) {
+      return verifyNotNull(ACCUMULO_PASSWORD_PROP, getString(ACCUMULO_PASSWORD_PROP));
+    } else if (containsKey(CLIENT_ACCUMULO_PASSWORD_PROP)) {
+      return verifyNotNull(CLIENT_ACCUMULO_PASSWORD_PROP, getString(CLIENT_ACCUMULO_PASSWORD_PROP));
+    }
+    throw new NoSuchElementException(ACCUMULO_PASSWORD_PROP + " is not set!");
   }
 
   public FluoConfiguration setAccumuloZookeepers(String zookeepers) {
-    return setNonEmptyString(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, zookeepers);
+    return setNonEmptyString(ACCUMULO_ZOOKEEPERS_PROP, zookeepers);
   }
 
   public String getAccumuloZookeepers() {
-    return getNonEmptyString(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
+    return getDepNonEmptyString(ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_PROP,
+        ACCUMULO_ZOOKEEPERS_DEFAULT);
   }
 
   /**
@@ -270,22 +423,58 @@ public class FluoConfiguration extends SimpleConfiguration {
    * retrieved from Zookeeper for clients.
    */
   public FluoConfiguration setAccumuloTable(String table) {
-    return setNonEmptyString(ADMIN_ACCUMULO_TABLE_PROP, table);
+    return setNonEmptyString(ACCUMULO_TABLE_PROP, table);
   }
 
   public String getAccumuloTable() {
-    return getNonEmptyString(ADMIN_ACCUMULO_TABLE_PROP);
+    return getDepNonEmptyString(ACCUMULO_TABLE_PROP, ADMIN_ACCUMULO_TABLE_PROP);
   }
 
+  @Deprecated
   public FluoConfiguration setAccumuloClasspath(String path) {
     setProperty(ADMIN_ACCUMULO_CLASSPATH_PROP, verifyNotNull(ADMIN_ACCUMULO_CLASSPATH_PROP, path));
     return this;
   }
 
+  @Deprecated
   public String getAccumuloClasspath() {
     return getString(ADMIN_ACCUMULO_CLASSPATH_PROP, ADMIN_ACCUMULO_CLASSPATH_DEFAULT);
   }
 
+  /**
+   * Sets paths to jars to provide to Accumulo. If not set, Fluo will find jars on classpath
+   *
+   * @param path CSV list of paths
+   * @since 1.2.0
+   */
+  public FluoConfiguration setAccumuloJars(String path) {
+    setProperty(ACCUMULO_JARS_PROP, verifyNotNull(ACCUMULO_JARS_PROP, path));
+    return this;
+  }
+
+  /**
+   * Gets CSV list of jar paths to provide to Accumulo
+   *
+   * @since 1.2.0
+   */
+  public String getAccumuloJars() {
+    return getString(ACCUMULO_JARS_PROP, ACCUMULO_JARS_DEFAULT);
+  }
+
+  /**
+   * @since 1.2.0
+   */
+  public FluoConfiguration setDfsRoot(String dfsRoot) {
+    return setNonEmptyString(DFS_ROOT_PROP, dfsRoot);
+  }
+
+  /**
+   * @since 1.2.0
+   */
+  public String getDfsRoot() {
+    return getNonEmptyString(DFS_ROOT_PROP, DFS_ROOT_DEFAULT);
+  }
+
   public FluoConfiguration setWorkerThreads(int numThreads) {
     return setPositiveInt(WORKER_NUM_THREADS_PROP, numThreads);
   }
@@ -307,7 +496,9 @@ public class FluoConfiguration extends SimpleConfiguration {
     while (iter.hasNext()) {
       String key = iter.next();
       if (key.startsWith(FluoConfiguration.OBSERVER_PREFIX)
-          && !key.equals(FluoConfiguration.OBSERVER_PROVIDER)) {
+          && !key.equals(FluoConfiguration.OBSERVER_PROVIDER)
+          && !key.equals(FluoConfiguration.OBSERVER_INIT_DIR_PROP)
+          && !key.equals(FluoConfiguration.OBSERVER_JARS_URL_PROP)) {
         String value = getString(key).trim();
 
         if (value.isEmpty()) {
@@ -385,6 +576,45 @@ public class FluoConfiguration extends SimpleConfiguration {
   }
 
   /**
+   * Sets directory where observers jars can found for initialization
+   *
+   * @param observerDir Path to directory
+   * @since 1.2.0
+   */
+  public FluoConfiguration setObserverInitDir(String observerDir) {
+    setProperty(OBSERVER_INIT_DIR_PROP, verifyNotNull(OBSERVER_INIT_DIR_PROP, observerDir));
+    return this;
+  }
+
+  /**
+   * Gets directory where observer jars can be found for initialization
+   *
+   * @return Path to directory
+   * @since 1.2.0
+   */
+  public String getObserverInitDir() {
+    return getString(OBSERVER_INIT_DIR_PROP, OBSERVER_INIT_DIR_DEFAULT);
+  }
+
+  /**
+   * Sets URL to directory where observer jars can be found
+   *
+   * @param observerJarsUrl URL to observer jars directory
+   * @since 1.2.0
+   */
+  public FluoConfiguration setObserverJarsUrl(String observerJarsUrl) {
+    setProperty(OBSERVER_JARS_URL_PROP, verifyNotNull(OBSERVER_JARS_URL_PROP, observerJarsUrl));
+    return this;
+  }
+
+  /**
+   * @since 1.2.0
+   */
+  public String getObserverJarsUrl() {
+    return getString(OBSERVER_JARS_URL_PROP, OBSERVER_JARS_URL_DEFAULT);
+  }
+
+  /**
    * @return The configured {@link ObserverProvider} class name. If one was not configured, returns
    *         {@value #OBSERVER_PROVIDER_DEFAULT}
    * @since 1.1.0
@@ -525,31 +755,41 @@ public class FluoConfiguration extends SimpleConfiguration {
     }
   }
 
-  private boolean verifyStringPropSet(String key) {
-    if (containsKey(key) && !getString(key).isEmpty()) {
-      return true;
+  private boolean verifyStringPropSet(String... keys) {
+    for (String key : keys) {
+      if (containsKey(key) && !getString(key).isEmpty()) {
+        return true;
+      }
     }
-    log.info(key + " is not set");
+    log.info(keys[0] + " is not set");
     return false;
   }
 
-  private boolean verifyStringPropNotSet(String key) {
-    if (containsKey(key) && !getString(key).isEmpty()) {
-      log.info(key + " should not be set");
-      return false;
+  private boolean verifyStringPropNotSet(String... keys) {
+    for (String key : keys) {
+      if (containsKey(key) && !getString(key).isEmpty()) {
+        log.info(key + " should not be set");
+        return false;
+      }
     }
     return true;
   }
 
+  public boolean hasRequiredConnectionProps() {
+    boolean valid = true;
+    valid &= verifyStringPropSet(CONNECTION_APPLICATION_NAME_PROP, CLIENT_APPLICATION_NAME_PROP);
+    return valid;
+  }
+
   /**
    * Returns true if required properties for FluoClient are set
    */
   public boolean hasRequiredClientProps() {
     boolean valid = true;
-    valid &= verifyStringPropSet(CLIENT_APPLICATION_NAME_PROP);
-    valid &= verifyStringPropSet(CLIENT_ACCUMULO_USER_PROP);
-    valid &= verifyStringPropSet(CLIENT_ACCUMULO_PASSWORD_PROP);
-    valid &= verifyStringPropSet(CLIENT_ACCUMULO_INSTANCE_PROP);
+    valid &= verifyStringPropSet(CONNECTION_APPLICATION_NAME_PROP, CLIENT_APPLICATION_NAME_PROP);
+    valid &= verifyStringPropSet(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP);
+    valid &= verifyStringPropSet(ACCUMULO_PASSWORD_PROP, CLIENT_ACCUMULO_PASSWORD_PROP);
+    valid &= verifyStringPropSet(ACCUMULO_INSTANCE_PROP, CLIENT_ACCUMULO_INSTANCE_PROP);
     return valid;
   }
 
@@ -559,7 +799,7 @@ public class FluoConfiguration extends SimpleConfiguration {
   public boolean hasRequiredAdminProps() {
     boolean valid = true;
     valid &= hasRequiredClientProps();
-    valid &= verifyStringPropSet(ADMIN_ACCUMULO_TABLE_PROP);
+    valid &= verifyStringPropSet(ACCUMULO_TABLE_PROP, ADMIN_ACCUMULO_TABLE_PROP);
     return valid;
   }
 
@@ -588,11 +828,11 @@ public class FluoConfiguration extends SimpleConfiguration {
     boolean valid = true;
     if (getMiniStartAccumulo()) {
       // ensure that client properties are not set since we are using MiniAccumulo
-      valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_USER_PROP);
-      valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_PASSWORD_PROP);
-      valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_INSTANCE_PROP);
-      valid &= verifyStringPropNotSet(CLIENT_ACCUMULO_ZOOKEEPERS_PROP);
-      valid &= verifyStringPropNotSet(CLIENT_ZOOKEEPER_CONNECT_PROP);
+      valid &= verifyStringPropNotSet(ACCUMULO_USER_PROP, CLIENT_ACCUMULO_USER_PROP);
+      valid &= verifyStringPropNotSet(ACCUMULO_PASSWORD_PROP, CLIENT_ACCUMULO_PASSWORD_PROP);
+      valid &= verifyStringPropNotSet(ACCUMULO_INSTANCE_PROP, CLIENT_ACCUMULO_INSTANCE_PROP);
+      valid &= verifyStringPropNotSet(ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_PROP);
+      valid &= verifyStringPropNotSet(CONNECTION_ZOOKEEPERS_PROP, CLIENT_ZOOKEEPER_CONNECT_PROP);
       if (valid == false) {
         log.error("Client properties should not be set in your configuration if MiniFluo is "
             + "configured to start its own accumulo (indicated by fluo.mini.start.accumulo being "
@@ -612,7 +852,8 @@ public class FluoConfiguration extends SimpleConfiguration {
     Iterator<String> iter = getKeys();
     while (iter.hasNext()) {
       String key = iter.next();
-      if (key.startsWith(CLIENT_PREFIX)) {
+      if (key.startsWith(CONNECTION_PREFIX) || key.startsWith(ACCUMULO_PREFIX)
+          || key.startsWith(CLIENT_PREFIX)) {
         clientConfig.setProperty(key, getRawString(key));
       }
     }
@@ -634,9 +875,10 @@ public class FluoConfiguration extends SimpleConfiguration {
    * not have defaults and will not be set.
    */
   public static void setDefaultConfiguration(SimpleConfiguration config) {
-    config.setProperty(CLIENT_ZOOKEEPER_CONNECT_PROP, CLIENT_ZOOKEEPER_CONNECT_DEFAULT);
-    config.setProperty(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
-    config.setProperty(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
+    config.setProperty(CONNECTION_ZOOKEEPERS_PROP, CONNECTION_ZOOKEEPERS_DEFAULT);
+    config.setProperty(CONNECTION_ZOOKEEPER_TIMEOUT_PROP, CONNECTION_ZOOKEEPER_TIMEOUT_DEFAULT);
+    config.setProperty(DFS_ROOT_PROP, DFS_ROOT_DEFAULT);
+    config.setProperty(ACCUMULO_ZOOKEEPERS_PROP, ACCUMULO_ZOOKEEPERS_DEFAULT);
     config.setProperty(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
     config.setProperty(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
     config.setProperty(LOADER_NUM_THREADS_PROP, LOADER_NUM_THREADS_DEFAULT);
@@ -669,6 +911,14 @@ public class FluoConfiguration extends SimpleConfiguration {
     return value;
   }
 
+  private int getDepPositiveInt(String property, String depProperty, int defaultValue) {
+    if (containsKey(property)) {
+      return getInt(property, defaultValue);
+    } else {
+      return getInt(depProperty, defaultValue);
+    }
+  }
+
   private FluoConfiguration setPositiveLong(String property, long value) {
     Preconditions.checkArgument(value > 0, property + " must be positive");
     setProperty(property, value);
@@ -702,6 +952,21 @@ public class FluoConfiguration extends SimpleConfiguration {
     return value;
   }
 
+  private String getDepNonEmptyString(String property, String depProperty, String defaultValue) {
+    return containsKey(property) ? getNonEmptyString(property, defaultValue) : getNonEmptyString(
+        depProperty, defaultValue);
+  }
+
+  private String getDepNonEmptyString(String property, String depProperty) {
+    if (containsKey(property)) {
+      return getNonEmptyString(property);
+    } else if (containsKey(depProperty)) {
+      return getNonEmptyString(depProperty);
+    } else {
+      throw new NoSuchElementException(property + " is not set!");
+    }
+  }
+
   private static String verifyNotNull(String property, String value) {
     Objects.requireNonNull(value, property + " cannot be null");
     return value;
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
index aa4c1eb..f54e3f5 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/SimpleConfiguration.java
@@ -60,18 +60,6 @@ public class SimpleConfiguration implements Serializable {
     internalConfig = compositeConfig;
   }
 
-  private void load(InputStream in) {
-    try {
-      PropertiesConfiguration config = new PropertiesConfiguration();
-      // disabled to prevent accumulo classpath value from being shortened
-      config.setDelimiterParsingDisabled(true);
-      config.load(in);
-      ((CompositeConfiguration) internalConfig).addConfiguration(config);
-    } catch (ConfigurationException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
   public SimpleConfiguration() {
     init();
   }
@@ -85,15 +73,7 @@ public class SimpleConfiguration implements Serializable {
    */
   public SimpleConfiguration(File propertiesFile) {
     this();
-    try {
-      PropertiesConfiguration config = new PropertiesConfiguration();
-      // disabled to prevent accumulo classpath value from being shortened
-      config.setDelimiterParsingDisabled(true);
-      config.load(propertiesFile);
-      ((CompositeConfiguration) internalConfig).addConfiguration(config);
-    } catch (ConfigurationException e) {
-      throw new IllegalArgumentException(e);
-    }
+    load(propertiesFile);
   }
 
   /**
@@ -186,6 +166,42 @@ public class SimpleConfiguration implements Serializable {
     return internalConfig.getString(key, defaultValue);
   }
 
+  /**
+   * Loads configuration from InputStream. Later loads have lower priority.
+   * 
+   * @param in InputStream to load from
+   * @since 1.2.0
+   */
+  public void load(InputStream in) {
+    try {
+      PropertiesConfiguration config = new PropertiesConfiguration();
+      // disabled to prevent accumulo classpath value from being shortened
+      config.setDelimiterParsingDisabled(true);
+      config.load(in);
+      ((CompositeConfiguration) internalConfig).addConfiguration(config);
+    } catch (ConfigurationException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Loads configuration from File. Later loads have lower priority.
+   * 
+   * @param file File to load from
+   * @since 1.2.0
+   */
+  public void load(File file) {
+    try {
+      PropertiesConfiguration config = new PropertiesConfiguration();
+      // disabled to prevent accumulo classpath value from being shortened
+      config.setDelimiterParsingDisabled(true);
+      config.load(file);
+      ((CompositeConfiguration) internalConfig).addConfiguration(config);
+    } catch (ConfigurationException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
+
   public void save(File file) {
     PropertiesConfiguration pconf = new PropertiesConfiguration();
     pconf.setDelimiterParsingDisabled(true);
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 3e36ba1..c4888d9 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -41,14 +41,14 @@ public class FluoConfigurationTest {
 
   @Test
   public void testDefaults() {
-    Assert.assertEquals(FluoConfiguration.CLIENT_ZOOKEEPER_CONNECT_DEFAULT,
+    Assert.assertEquals(FluoConfiguration.CONNECTION_ZOOKEEPERS_DEFAULT,
         base.getInstanceZookeepers());
-    Assert.assertEquals(FluoConfiguration.CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT,
+    Assert.assertEquals(FluoConfiguration.CONNECTION_ZOOKEEPER_TIMEOUT_DEFAULT,
         base.getZookeeperTimeout());
-    Assert.assertEquals(FluoConfiguration.CLIENT_RETRY_TIMEOUT_MS_DEFAULT,
-        base.getClientRetryTimeout());
-    Assert.assertEquals(FluoConfiguration.CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT,
-        base.getAccumuloZookeepers());
+    Assert.assertEquals(FluoConfiguration.CONNECTION_RETRY_TIMEOUT_MS_DEFAULT,
+        base.getConnectionRetryTimeout());
+    Assert
+        .assertEquals(FluoConfiguration.ACCUMULO_ZOOKEEPERS_DEFAULT, base.getAccumuloZookeepers());
     Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
         base.getAccumuloClasspath());
     Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT, base.getWorkerThreads());
@@ -58,6 +58,9 @@ public class FluoConfigurationTest {
     Assert.assertEquals(FluoConfiguration.LOADER_QUEUE_SIZE_DEFAULT, base.getLoaderQueueSize());
     Assert.assertEquals(FluoConfiguration.MINI_START_ACCUMULO_DEFAULT, base.getMiniStartAccumulo());
     Assert.assertTrue(base.getMiniDataDir().endsWith("/mini"));
+    Assert.assertEquals(FluoConfiguration.OBSERVER_INIT_DIR_DEFAULT, base.getObserverInitDir());
+    Assert.assertEquals(FluoConfiguration.OBSERVER_JARS_URL_DEFAULT, base.getObserverJarsUrl());
+    Assert.assertEquals(FluoConfiguration.DFS_ROOT_DEFAULT, base.getDfsRoot());
   }
 
   @Test(expected = NoSuchElementException.class)
@@ -85,6 +88,7 @@ public class FluoConfigurationTest {
     FluoConfiguration config = new FluoConfiguration();
     Assert.assertEquals("path1,path2", config.setAccumuloClasspath("path1,path2")
         .getAccumuloClasspath());
+    Assert.assertEquals("path1,path2", config.setAccumuloJars("path1,path2").getAccumuloJars());
     Assert.assertEquals("instance", config.setAccumuloInstance("instance").getAccumuloInstance());
     Assert.assertEquals("pass", config.setAccumuloPassword("pass").getAccumuloPassword());
     Assert.assertEquals("table", config.setAccumuloTable("table").getAccumuloTable());
@@ -101,7 +105,12 @@ public class FluoConfigurationTest {
     Assert.assertEquals(14, config.setZookeeperTimeout(14).getZookeeperTimeout());
     Assert.assertFalse(config.setMiniStartAccumulo(false).getMiniStartAccumulo());
     Assert.assertEquals("mydata", config.setMiniDataDir("mydata").getMiniDataDir());
-    Assert.assertEquals(17, config.setClientRetryTimeout(17).getClientRetryTimeout());
+    Assert.assertEquals(17, config.setConnectionRetryTimeout(17).getConnectionRetryTimeout());
+    Assert.assertEquals("/path/to/dir", config.setObserverInitDir("/path/to/dir")
+        .getObserverInitDir());
+    Assert.assertEquals("hdfs://localhost/mydir",
+        config.setObserverJarsUrl("hdfs://localhost/mydir").getObserverJarsUrl());
+    Assert.assertEquals("hdfs123", config.setDfsRoot("hdfs123").getDfsRoot());
   }
 
   @Test
@@ -169,8 +178,8 @@ public class FluoConfigurationTest {
   }
 
   @Test
-  public void testLoadingPropsFile() {
-    File propsFile = new File("../distribution/src/main/config/fluo.properties");
+  public void testLoadingOldPropsFile() {
+    File propsFile = new File("../distribution/src/main/config/fluo.properties.deprecated");
     Assert.assertTrue(propsFile.exists());
 
     FluoConfiguration config = new FluoConfiguration(propsFile);
@@ -198,6 +207,91 @@ public class FluoConfigurationTest {
   }
 
   @Test
+  public void testLoadingDistPropsFile() {
+    File connectionProps = new File("../distribution/src/main/config/fluo-conn.properties");
+    Assert.assertTrue(connectionProps.exists());
+    File applicationProps = new File("../distribution/src/main/config/fluo-app.properties");
+    Assert.assertTrue(applicationProps.exists());
+
+    FluoConfiguration config = new FluoConfiguration(connectionProps);
+    config.load(applicationProps);
+    // check for values set in prop file
+    Assert.assertEquals("localhost/fluo", config.getInstanceZookeepers());
+    Assert.assertEquals("localhost", config.getAccumuloZookeepers());
+    Assert.assertEquals("hdfs://localhost:8020/fluo", config.getDfsRoot());
+    Assert.assertEquals("", config.getAccumuloPassword());
+    Assert.assertEquals("", config.getObserverProvider());
+    Assert.assertEquals("", config.getObserverInitDir());
+    Assert.assertEquals("", config.getAccumuloJars());
+    Assert.assertEquals("", config.getObserverJarsUrl());
+
+    try {
+      config.getApplicationName();
+      Assert.fail();
+    } catch (NoSuchElementException e) {
+    }
+    try {
+      config.getAccumuloUser();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+    }
+    try {
+      config.getAccumuloInstance();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+    }
+  }
+
+  @Test
+  public void testLoadingOldTestPropsFile() {
+    File propsFile = new File("src/test/resources/fluo.properties");
+    Assert.assertTrue(propsFile.exists());
+
+    FluoConfiguration config = new FluoConfiguration(propsFile);
+    // make sure classpath contains comma. otherwise it was shortened
+    Assert.assertTrue(config.getAccumuloClasspath().contains(","));
+    // check for values set in prop file
+    Assert.assertEquals("app1", config.getApplicationName());
+    Assert.assertEquals("localhost/fluo2", config.getInstanceZookeepers());
+    Assert.assertEquals(3, config.getZookeeperTimeout());
+    Assert.assertEquals("instance4", config.getAccumuloInstance());
+    Assert.assertEquals("user5", config.getAccumuloUser());
+    Assert.assertEquals("pass6", config.getAccumuloPassword());
+    Assert.assertEquals("zoo7", config.getAccumuloZookeepers());
+    Assert.assertEquals(8, config.getClientRetryTimeout());
+    Assert.assertEquals(8, config.getConnectionRetryTimeout());
+    Assert.assertEquals("table9", config.getAccumuloTable());
+  }
+
+  @Test
+  public void testLoadingTestPropsFile() {
+    File applicationProps = new File("src/test/resources/fluo-app.properties");
+    Assert.assertTrue(applicationProps.exists());
+
+    FluoConfiguration config = new FluoConfiguration(applicationProps);
+    config.setApplicationName("test-app");
+    Assert.assertEquals("com.foo.FooObserverProvider", config.getObserverProvider());
+    Assert.assertEquals("test-app", config.getApplicationName());
+    Assert.assertEquals("/path/to/observer/foo/", config.getObserverInitDir());
+    Assert.assertEquals("myInstance", config.getAccumuloInstance());
+    Assert.assertEquals("test-app", config.getAccumuloTable());
+    Assert.assertEquals("testUser", config.getAccumuloUser());
+    Assert.assertEquals("testPass", config.getAccumuloPassword());
+    Assert.assertEquals("myhost", config.getAccumuloZookeepers());
+    Assert.assertEquals("hdfs://myhost:10000", config.getDfsRoot());
+    Assert.assertEquals("localhost/fluo", config.getInstanceZookeepers());
+    Assert.assertEquals(30000, config.getZookeeperTimeout());
+    Assert.assertEquals(-1, config.getConnectionRetryTimeout());
+
+    File connectionProps = new File("src/test/resources/fluo-conn.properties");
+    Assert.assertTrue(applicationProps.exists());
+    config.load(connectionProps);
+    Assert.assertEquals("localhost/test-fluo", config.getInstanceZookeepers());
+    Assert.assertEquals(50000, config.getZookeeperTimeout());
+    Assert.assertEquals(3000, config.getConnectionRetryTimeout());
+  }
+
+  @Test
   public void testMetricsProp() throws Exception {
     FluoConfiguration config = new FluoConfiguration();
     config.setProperty(FluoConfiguration.REPORTER_PREFIX + ".slf4j.logger", "m1");
@@ -298,7 +392,7 @@ public class FluoConfigurationTest {
   private void assertGetNameIAE(String name) {
     FluoConfiguration config = new FluoConfiguration();
     try {
-      config.setProperty(FluoConfiguration.CLIENT_APPLICATION_NAME_PROP, name);
+      config.setProperty(FluoConfiguration.CONNECTION_APPLICATION_NAME_PROP, name);
       config.getApplicationName();
       Assert.fail();
     } catch (IllegalArgumentException e) {
@@ -350,7 +444,7 @@ public class FluoConfigurationTest {
     }
     String[] nonEmptyMethods =
         {"setAccumuloInstance", "setAccumuloTable", "setAccumuloUser", "setAccumuloZookeepers",
-            "setMiniDataDir", "setInstanceZookeepers"};
+            "setMiniDataDir", "setInstanceZookeepers", "setDfsRoot"};
     for (String methodName : nonEmptyMethods) {
       try {
         config.getClass().getMethod(methodName, String.class).invoke(config, "");
diff --git a/modules/api/src/test/resources/fluo-app.properties b/modules/api/src/test/resources/fluo-app.properties
new file mode 100644
index 0000000..cd0b30f
--- /dev/null
+++ b/modules/api/src/test/resources/fluo-app.properties
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+########################
+# Application properties
+########################
+
+# NOTE - All properties that have a default are set with it. Uncomment
+# a property if you want to use a value different than the default. 
+# Properties that have no default are uncommented and must be set by
+# the user.
+
+## Application properties
+## ----------------------
+## Specifies an observer provider.  This should be the name of a class that
+## implements org.apache.fluo.api.observer.ObserverProvider.
+fluo.observer.provider=com.foo.FooObserverProvider
+## Observer jars in this directory are stored away in HDFS during initialization
+fluo.observer.init.dir=/path/to/observer/foo/
+## Properties with a prefix of fluo.app.* are stored in zookeeper at
+## initialization time and can easily be retrieved by a Fluo application running
+## on any node in the cluster.
+fluo.app.key1=val1
+
+## DFS properties
+## ---------------
+## Fluo DFS root path. Should be prefixed with 'fs.defaultFS' property in Hadoop's core-site.xml
+fluo.dfs.root=hdfs://myhost:10000
+
+## Accumulo properties
+## -------------------
+## Accumulo instance to connect to
+fluo.accumulo.instance=myInstance
+## Accumulo table to initialize
+fluo.accumulo.table=\${fluo.connection.application.name}
+## Accumulo user
+fluo.accumulo.user=testUser
+## Accumulo password
+fluo.accumulo.password=testPass
+## Accumulo zookeepers
+fluo.accumulo.zookeepers=myhost
+
+# Transaction properties
+# ----------------------
+# Amount of time (in milliseconds) clients wait before rolling back transaction
+#fluo.tx.rollback.time=300000
+
+## Worker properties
+## -----------------
+# Number of threads in each worker instance
+#fluo.worker.num.threads=10
+
+## Loader properties
+## -----------------
+## Number of threads each loader runs.  Can set to zero for no threads, thread
+## adding Loader will execute.  Must also set fluo.loader.queue.size to zero
+## when setting this to zero.
+#fluo.loader.num.threads=10
+## Queue size of loader
+#fluo.loader.queue.size=10
+
+## Metrics
+## -------
+## Configure reporters for metrics. The frequency for each type of reporter is in seconds.
+
+#fluo.metrics.reporter.console.enable=false
+#fluo.metrics.reporter.console.target=stdout
+#fluo.metrics.reporter.console.rateUnit=seconds
+#fluo.metrics.reporter.console.durationUnit=milliseconds
+#fluo.metrics.reporter.console.frequency=60
+
+#fluo.metrics.reporter.csv.enable=false
+#fluo.metrics.reporter.csv.dir=/tmp/
+#fluo.metrics.reporter.csv.rateUnit=seconds
+#fluo.metrics.reporter.csv.durationUnit=milliseconds
+#fluo.metrics.reporter.csv.frequency=60
+
+#fluo.metrics.reporter.graphite.enable=false
+#fluo.metrics.reporter.graphite.host=carbon.server.com
+#fluo.metrics.reporter.graphite.port=8080
+#fluo.metrics.reporter.graphite.rateUnit=seconds
+#fluo.metrics.reporter.graphite.durationUnit=milliseconds
+#fluo.metrics.reporter.graphite.frequency=60
+#fluo.metrics.reporter.graphite.prefix=
+
+#fluo.metrics.reporter.jmx.enable=false
+#fluo.metrics.reporter.jmx.rateUnit=seconds
+#fluo.metrics.reporter.jmx.durationUnit=milliseconds
+
+#fluo.metrics.reporter.slf4j.enable=false
+#fluo.metrics.reporter.slf4j.logger=metrics
+#fluo.metrics.reporter.slf4j.rateUnit=seconds
+#fluo.metrics.reporter.slf4j.durationUnit=milliseconds
diff --git a/modules/api/src/test/resources/fluo-conn.properties b/modules/api/src/test/resources/fluo-conn.properties
new file mode 100644
index 0000000..0279c0c
--- /dev/null
+++ b/modules/api/src/test/resources/fluo-conn.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+#######################
+# Connection properties
+#######################
+
+# NOTE - All properties that have a default are set with it.  Uncomment
+# a property if you want to use a value different than the default. 
+# Properties that have no default are uncommented and must be set by
+# the user.
+
+## Zookeeper connection string specifying host and chroot where Fluo stores data.
+## A chroot directory suffix must be specified but doesn't need to be named
+## '/fluo'.  If not specified, a Fluo application cannot be initialized.
+fluo.connection.zookeepers=localhost/test-fluo
+## Zookeeper timeout
+fluo.connection.zookeeper.timeout=50000
+## Connection retry timeout (in milliseconds). Set to -1 to retry forever.
+fluo.connection.retry.timeout.ms=3000
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/api/src/test/resources/fluo.properties
similarity index 91%
copy from modules/distribution/src/main/config/fluo.properties
copy to modules/api/src/test/resources/fluo.properties
index f9f603d..4b86285 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/api/src/test/resources/fluo.properties
@@ -15,8 +15,9 @@
 # Fluo properties
 #################
 
-# NOTE - All properties that have a default are set with it.  Uncomment
-# a property if you want to use a value different than the default. 
+# NOTE - This file has been deprecated and replaced by fluo-conn.properties &
+# fluo-app.properties. All properties that have a default are set with it.
+# Uncomment a property if you want to use a value different than the default.
 # Properties that have no default are uncommented and must be set by
 # the user.  Most are unset except for fluo.accumulo.classpath which
 # has a suggested value.
@@ -24,30 +25,30 @@
 # Client properties
 # -----------------
 # Fluo application name
-fluo.client.application.name=
+fluo.client.application.name=app1
 # Zookeeper connection string specifying host and chroot where Fluo stores data.
 # A chroot directory suffix must be specified but doesn't need to be named
 # '/fluo'.  If not specified, a Fluo application cannot be initialized.
 # Interpolation (i.e \${fluo.client.accumulo.zookeepers}/fluo) can be used
 # when setting this to reuse Accumulo's zookeeper connection string.
-#fluo.client.zookeeper.connect=localhost/fluo
+fluo.client.zookeeper.connect=localhost/fluo2
 # Zookeeper timeout
-#fluo.client.zookeeper.timeout=30000
+fluo.client.zookeeper.timeout=3
 # Accumulo instance to connect to
-fluo.client.accumulo.instance=
+fluo.client.accumulo.instance=instance4
 # Accumulo user
-fluo.client.accumulo.user=
+fluo.client.accumulo.user=user5
 # Accumulo password
-fluo.client.accumulo.password=
+fluo.client.accumulo.password=pass6
 # Accumulo zookeepers
-#fluo.client.accumulo.zookeepers=localhost
+fluo.client.accumulo.zookeepers=zoo7
 # Client retry timeout (in milliseconds). Set to -1 to retry forever.
-#fluo.client.retry.timeout.ms=-1
+fluo.client.retry.timeout.ms=8
 
 # Admin properties
 # ----------------
 # Accumulo table to initialize
-fluo.admin.accumulo.table=\${fluo.client.application.name}
+fluo.admin.accumulo.table=table9
 # HDFS root path. Should match 'fs.defaultFS' property in Hadoop's core-site.xml
 fluo.admin.hdfs.root=hdfs://localhost:10000
 # Fluo uses iterators within Accumulo tablet servers, therefore Accumulo per
diff --git a/modules/cluster/pom.xml b/modules/cluster/pom.xml
index efce104..b740571 100644
--- a/modules/cluster/pom.xml
+++ b/modules/cluster/pom.xml
@@ -110,10 +110,5 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 </project>
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
index 24852f1..689173f 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of Fluo command
  */
+@Deprecated
 public class FluoCommand {
 
   public static void verifyNoArgs(String[] remainArgs) {
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
index 277f7a8..5939710 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Run method of Fluo oracle that is called within a Twill/YARN application
  */
+@Deprecated
 public class OracleRunnable extends AbstractTwillRunnable {
 
   private static final Logger log = LoggerFactory.getLogger(OracleRunnable.class);
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
index a1bf9b6..fa2229b 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Run method of Fluo worker that is called within a Twill/YARN application
  */
+@Deprecated
 public class WorkerRunnable extends AbstractTwillRunnable {
 
   private static final Logger log = LoggerFactory.getLogger(WorkerRunnable.class);
@@ -75,7 +76,7 @@ public class WorkerRunnable extends AbstractTwillRunnable {
         System.exit(-1);
       }
       // any client in worker should retry forever
-      config.setClientRetryTimeout(-1);
+      config.setConnectionRetryTimeout(-1);
 
       try {
         config.validate();
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index a984469..7e938e9 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -17,45 +17,32 @@ package org.apache.fluo.cluster.runner;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-
+import java.util.Collections;
+import java.util.List;
 import javax.inject.Provider;
 
 import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.accumulo.format.FluoFormatter;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.cluster.util.FluoYarnConfig;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
-import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ByteUtil;
-import org.apache.fluo.core.util.Hex;
-import org.apache.fluo.core.util.SpanUtil;
+import org.apache.fluo.core.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Base class for running a Fluo application
  */
+@Deprecated
 public abstract class AppRunner {
 
   private static final Logger log = LoggerFactory.getLogger(AppRunner.class);
@@ -68,62 +55,7 @@ public abstract class AppRunner {
     this.scriptName = scriptName;
   }
 
-  public static Span getSpan(ScanOptions options) {
-    Span span = new Span();
-    if ((options.getExactRow() != null)
-        && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
-            .getRowPrefix() != null))) {
-      throw new IllegalArgumentException(
-          "You cannot specify an exact row with a start/end row or row prefix!");
-    }
-
-    if ((options.getRowPrefix() != null)
-        && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
-            .getExactRow() != null))) {
-      throw new IllegalArgumentException(
-          "You cannot specify an prefix row with a start/end row or exact row!");
-    }
-
-    // configure span of scanner
-    if (options.getExactRow() != null) {
-      span = Span.exact(options.getExactRow());
-    } else if (options.getRowPrefix() != null) {
-      span = Span.prefix(options.getRowPrefix());
-    } else {
-      if ((options.getStartRow() != null) && (options.getEndRow() != null)) {
-        span = new Span(options.getStartRow(), true, options.getEndRow(), true);
-      } else if (options.getStartRow() != null) {
-        span = new Span(Bytes.of(options.getStartRow()), true, Bytes.EMPTY, true);
-      } else if (options.getEndRow() != null) {
-        span = new Span(Bytes.EMPTY, true, Bytes.of(options.getEndRow()), true);
-      }
-    }
-
-    return span;
-  }
-
-  public static Collection<Column> getColumns(ScanOptions options) {
-    Collection<Column> columns = new HashSet<>();
-
-    // configure columns of scanner
-    for (String column : options.getColumns()) {
-      String[] colFields = column.split(":");
-      if (colFields.length == 1) {
-        columns.add(new Column(colFields[0]));
-      } else if (colFields.length == 2) {
-        columns.add(new Column(colFields[0], colFields[1]));
-      } else {
-        throw new IllegalArgumentException("Failed to scan!  Column '" + column
-            + "' has too many fields (indicated by ':')");
-      }
-    }
-
-    return columns;
-  }
-
-
-
-  public long scan(FluoConfiguration config, String[] args) {
+  public void scan(FluoConfiguration config, String[] args) {
     ScanOptions options = new ScanOptions();
     JCommander jcommand = new JCommander(options);
     jcommand.setProgramName(scriptName + " scan <app>");
@@ -141,108 +73,12 @@ public abstract class AppRunner {
     }
 
     if (options.scanAccumuloTable) {
-      return scanAccumulo(options, config);
+      ScanUtil.scanAccumulo(options.getScanOpts(), config);
     } else {
-      return scanFluo(options, config);
+      ScanUtil.scanFluo(options.getScanOpts(), config);
     }
   }
 
-  private long scanFluo(ScanOptions options, FluoConfiguration sConfig) {
-    System.out.println("Scanning snapshot of data in Fluo '" + sConfig.getApplicationName()
-        + "' application.");
-
-    long entriesFound = 0;
-    try (FluoClient client = FluoFactory.newClient(sConfig)) {
-      try (Snapshot s = client.newSnapshot()) {
-
-        Span span = null;
-        Collection<Column> columns = null;
-        try {
-          span = getSpan(options);
-          columns = getColumns(options);
-        } catch (IllegalArgumentException e) {
-          System.err.println(e.getMessage());
-          System.exit(-1);
-        }
-
-        CellScanner cellScanner = s.scanner().over(span).fetch(columns).build();
-
-        StringBuilder sb = new StringBuilder();
-        for (RowColumnValue rcv : cellScanner) {
-          if (options.hexEncNonAscii) {
-            sb.setLength(0);
-            Hex.encNonAscii(sb, rcv.getRow());
-            sb.append(" ");
-            Hex.encNonAscii(sb, rcv.getColumn(), " ");
-            sb.append("\t");
-            Hex.encNonAscii(sb, rcv.getValue());
-            System.out.println(sb.toString());
-          } else {
-            sb.setLength(0);
-            sb.append(rcv.getsRow());
-            sb.append(" ");
-            sb.append(rcv.getColumn());
-            sb.append("\t");
-            sb.append(rcv.getsValue());
-            System.out.println(sb.toString());
-          }
-          entriesFound++;
-          if (System.out.checkError()) {
-            break;
-          }
-        }
-
-        if (entriesFound == 0) {
-          System.out.println("\nNo data found\n");
-        }
-      } catch (FluoException e) {
-        System.out.println("Scan failed - " + e.getMessage());
-      }
-    }
-    return entriesFound;
-  }
-
-  private long scanAccumulo(ScanOptions options, FluoConfiguration sConfig) {
-    System.out.println("Scanning data in Accumulo directly for '" + sConfig.getApplicationName()
-        + "' application.");
-
-    Connector conn = AccumuloUtil.getConnector(sConfig);
-
-    Span span = null;
-    Collection<Column> columns = null;
-    try {
-      span = getSpan(options);
-      columns = getColumns(options);
-    } catch (IllegalArgumentException e) {
-      System.err.println(e.getMessage());
-      System.exit(-1);
-    }
-
-    long entriesFound = 0;
-
-    try {
-      Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
-      scanner.setRange(SpanUtil.toRange(span));
-      for (Column col : columns) {
-        if (col.isQualifierSet()) {
-          scanner
-              .fetchColumn(ByteUtil.toText(col.getFamily()), ByteUtil.toText(col.getQualifier()));
-        } else {
-          scanner.fetchColumnFamily(ByteUtil.toText(col.getFamily()));
-        }
-      }
-
-      for (String entry : Iterables.transform(scanner, FluoFormatter::toString)) {
-        System.out.println(entry);
-      }
-    } catch (Exception e) {
-      System.out.println("Scan failed - " + e.getMessage());
-      entriesFound++;
-    }
-
-    return entriesFound;
-  }
-
   private long calculateSleep(long notifyCount, long numWorkers) {
     long sleep = notifyCount / numWorkers / 100;
     if (sleep < MIN_SLEEP_SEC) {
@@ -320,7 +156,6 @@ public abstract class AppRunner {
         }
       });
     }
-
   }
 
   public void exec(FluoConfiguration fluoConfig, String[] args) throws Exception {
@@ -336,4 +171,65 @@ public abstract class AppRunner {
     Method method = clazz.getMethod("main", String[].class);
     method.invoke(null, (Object) Arrays.copyOfRange(args, 1, args.length));
   }
+
+  public static class ScanOptions {
+
+    @Parameter(names = "-s", description = "Start row (inclusive) of scan")
+    private String startRow;
+
+    @Parameter(names = "-e", description = "End row (inclusive) of scan")
+    private String endRow;
+
+    @Parameter(names = "-c", description = "Columns of scan in comma separated format: "
+        + "<<columnfamily>[:<columnqualifier>]{,<columnfamily>[:<columnqualifier>]}> ")
+    private List<String> columns;
+
+    @Parameter(names = "-r", description = "Exact row to scan")
+    private String exactRow;
+
+    @Parameter(names = "-p", description = "Row prefix to scan")
+    private String rowPrefix;
+
+    @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help")
+    public boolean help;
+
+    @Parameter(names = {"-esc", "--escape-non-ascii"}, help = true,
+        description = "Hex encode non ascii bytes", arity = 1)
+    public boolean hexEncNonAscii = true;
+
+    @Parameter(
+        names = "--raw",
+        help = true,
+        description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo "
+            + "internal schema, making it easier to comprehend.")
+    public boolean scanAccumuloTable = false;
+
+    public String getStartRow() {
+      return startRow;
+    }
+
+    public String getEndRow() {
+      return endRow;
+    }
+
+    public String getExactRow() {
+      return exactRow;
+    }
+
+    public String getRowPrefix() {
+      return rowPrefix;
+    }
+
+    public List<String> getColumns() {
+      if (columns == null) {
+        return Collections.emptyList();
+      }
+      return columns;
+    }
+
+    public ScanUtil.ScanOpts getScanOpts() {
+      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
+          hexEncNonAscii, scanAccumuloTable);
+    }
+  }
 }
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
index fbfb91c..907ba67 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
@@ -29,6 +29,7 @@ import org.apache.fluo.core.client.FluoAdminImpl;
 /**
  * For running Fluo app on cluster
  */
+@Deprecated
 public abstract class ClusterAppRunner extends AppRunner {
 
   public ClusterAppRunner(String scriptName) {
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ScanOptions.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ScanOptions.java
deleted file mode 100644
index 6670202..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ScanOptions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runner;
-
-import java.util.Collections;
-import java.util.List;
-
-import com.beust.jcommander.Parameter;
-
-public class ScanOptions {
-
-  @Parameter(names = "-s", description = "Start row (inclusive) of scan")
-  private String startRow;
-
-  @Parameter(names = "-e", description = "End row (inclusive) of scan")
-  private String endRow;
-
-  @Parameter(names = "-c", description = "Columns of scan in comma separated format: "
-      + "<<columnfamily>[:<columnqualifier>]{,<columnfamily>[:<columnqualifier>]}> ")
-  private List<String> columns;
-
-  @Parameter(names = "-r", description = "Exact row to scan")
-  private String exactRow;
-
-  @Parameter(names = "-p", description = "Row prefix to scan")
-  private String rowPrefix;
-
-  @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help")
-  public boolean help;
-
-  @Parameter(names = {"-esc", "--escape-non-ascii"}, help = true,
-      description = "Hex encode non ascii bytes", arity = 1)
-  public boolean hexEncNonAscii = true;
-
-  @Parameter(
-      names = "--raw",
-      help = true,
-      description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo "
-          + "internal schema, making it easier to comprehend.")
-  public boolean scanAccumuloTable = false;
-
-  public String getStartRow() {
-    return startRow;
-  }
-
-  public String getEndRow() {
-    return endRow;
-  }
-
-  public String getExactRow() {
-    return exactRow;
-  }
-
-  public String getRowPrefix() {
-    return rowPrefix;
-  }
-
-  public List<String> getColumns() {
-    if (columns == null) {
-      return Collections.emptyList();
-    }
-    return columns;
-  }
-}
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index 7894bce..4a65384 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Yarn Implementation of ClusterAppRunner
  */
+@Deprecated
 public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
 
   private static final Logger log = LoggerFactory.getLogger(YarnAppRunner.class);
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
index 1d5c675..ae67555 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
@@ -17,6 +17,7 @@ package org.apache.fluo.cluster.util;
 
 import java.io.File;
 
+@Deprecated
 public class ClusterUtil {
 
   private ClusterUtil() {}
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
index b043f04..447ef78 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
@@ -20,6 +20,7 @@ import java.io.File;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
 
+@Deprecated
 public class FluoInstall {
 
   private String fluoHomeDir;
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
index f078c44..acc456c 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
@@ -18,6 +18,7 @@ package org.apache.fluo.cluster.util;
 import com.google.common.base.Preconditions;
 import org.apache.fluo.api.config.FluoConfiguration;
 
+@Deprecated
 public class FluoYarnConfig {
 
   private static final String YARN_PREFIX = FluoConfiguration.FLUO_PREFIX + ".yarn";
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
index 079e0c6..5394edb 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
@@ -30,6 +30,7 @@ import static org.apache.fluo.api.config.FluoConfiguration.FLUO_PREFIX;
 /**
  * Used to initialize Logging for cluster applications
  */
+@Deprecated
 public class LogbackUtil {
 
   private static final Logger log = LoggerFactory.getLogger(LogbackUtil.class);
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
index 50757cf..e765819 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
@@ -17,6 +17,7 @@ package org.apache.fluo.cluster.util;
 
 import org.apache.fluo.api.config.FluoConfiguration;
 
+@Deprecated
 public class ValidateAppName {
 
   public static void main(String[] args) {
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index 02f27fd..7308d8c 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Represents Fluo application in Twill
  */
+@Deprecated
 public class FluoTwillApp implements TwillApplication {
 
   private static final Logger log = LoggerFactory.getLogger(FluoTwillApp.class);
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
index 6e903fc..58435a0 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
@@ -23,6 +23,7 @@ import org.apache.twill.api.TwillRunResources;
 /**
  * Twill Utility classes
  */
+@Deprecated
 public class TwillUtil {
 
   private TwillUtil() {}
diff --git a/modules/cluster/src/test/resources/log4j.xml b/modules/cluster/src/test/resources/log4j.xml
deleted file mode 100644
index bb60fe0..0000000
--- a/modules/cluster/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more contributor license
-  agreements. See the NOTICE file distributed with this work for additional information regarding
-  copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance with the License. You may obtain a
-  copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software distributed under the License
-  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-  or implied. See the License for the specific language governing permissions and limitations under
-  the License.
--->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
-  <appender name="console" class="org.apache.log4j.ConsoleAppender">
-    <param name="Target" value="System.out"/>
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n" />
-    </layout>
-  </appender>
-
-  <logger name="org.apache.zookeeper">
-    <level value="ERROR" />
-  </logger>
-
-  <logger name="org.apache.curator">
-    <level value="ERROR" />
-  </logger>
-
-  <logger name="org.apache.fluo">
-    <level value="ERROR" />
-  </logger>
-
-  <root>
-    <level value="INFO" />
-    <appender-ref ref="console" />
-  </root>
-</log4j:configuration>
diff --git a/modules/cluster/pom.xml b/modules/command/pom.xml
similarity index 56%
copy from modules/cluster/pom.xml
copy to modules/command/pom.xml
index efce104..8c8bf46 100644
--- a/modules/cluster/pom.xml
+++ b/modules/command/pom.xml
@@ -21,23 +21,11 @@
     <version>1.2.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <artifactId>fluo-cluster</artifactId>
-  <name>Apache Fluo Cluster</name>
-  <description>This module contains all code necessary to run Apache Fluo on a YARN cluster using
-    Apache Twill. It was separated from fluo-core to keep dependencies (like Twill) out of Fluo
-    clients which depend on the fluo-core jar.  It was also done to limit conflicts. For example,
-    Twill requires logback to be used but fluo-core requires log4j (due to zookeeper requiring it in
-    accumulo-minicluster).</description>
+  <artifactId>fluo-command</artifactId>
+  <name>Apache Fluo Command</name>
+  <description>This module contains code for Apache Fluo commands</description>
   <dependencies>
     <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-classic</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-core</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
     </dependency>
@@ -51,19 +39,12 @@
       <version>4.0</version>
     </dependency>
     <dependency>
-      <groupId>javax.inject</groupId>
-      <artifactId>javax.inject</artifactId>
-      <version>1</version>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.curator</groupId>
@@ -78,37 +59,12 @@
       <artifactId>fluo-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-      <exclusions>
-        <!-- Excluded so we use asm jar included by twill-yarn.  See FLUO-409 -->
-        <exclusion>
-          <groupId>asm</groupId>
-          <artifactId>asm</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.twill</groupId>
-      <artifactId>twill-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.twill</groupId>
-      <artifactId>twill-yarn</artifactId>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <artifactId>slf4j-log4j12</artifactId>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java b/modules/command/src/main/java/org/apache/fluo/command/CommandUtil.java
similarity index 56%
copy from modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
copy to modules/command/src/main/java/org/apache/fluo/command/CommandUtil.java
index 50757cf..eece560 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/CommandUtil.java
@@ -13,23 +13,26 @@
  * the License.
  */
 
-package org.apache.fluo.cluster.util;
+package org.apache.fluo.command;
 
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
 
-public class ValidateAppName {
+public class CommandUtil {
 
-  public static void main(String[] args) {
-    if (args.length != 1) {
-      System.out.println("ERROR - Expected usage: ValidateAppName <fluoApplicationName>");
+  public static void verifyAppInitialized(FluoConfiguration config) {
+    if (!FluoAdminImpl.isInitialized(config)) {
+      System.out.println("A Fluo '" + config.getApplicationName() + "' application has not "
+          + "been initialized yet in Zookeeper at " + config.getAppZookeepers());
       System.exit(-1);
     }
+  }
 
-    FluoConfiguration config = new FluoConfiguration();
-    try {
-      config.setApplicationName(args[0]);
-    } catch (IllegalArgumentException e) {
-      System.out.println("ERROR - " + e.getMessage());
+  public static void verifyAppRunning(FluoConfiguration config) {
+    verifyAppInitialized(config);
+    if (!FluoAdminImpl.oracleExists(config)) {
+      System.out.println("A Fluo '" + config.getApplicationName() + "' application is initialized "
+          + "but is not running!");
       System.exit(-1);
     }
   }
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoConfig.java b/modules/command/src/main/java/org/apache/fluo/command/FluoConfig.java
new file mode 100644
index 0000000..3c0b02d
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoConfig {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoConfig.class);
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: FluoConfig <connectionPropsPath> <applicationName>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    String applicationName = args[1];
+    Objects.requireNonNull(connectionPropsPath);
+    Objects.requireNonNull(applicationName);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+    config.setApplicationName(applicationName);
+    CommandUtil.verifyAppInitialized(config);
+
+    try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
+      for (Map.Entry<String, String> entry : admin.getApplicationConfig().toMap().entrySet()) {
+        System.out.println(entry.getKey() + " = " + entry.getValue());
+      }
+    }
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoExec.java b/modules/command/src/main/java/org/apache/fluo/command/FluoExec.java
new file mode 100644
index 0000000..61c04a2
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoExec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Objects;
+
+import javax.inject.Provider;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
+
+public class FluoExec {
+
+  private static class FluoConfigModule extends AbstractModule {
+
+    private Class<?> clazz;
+    private FluoConfiguration fluoConfig;
+
+    FluoConfigModule(Class<?> clazz, FluoConfiguration fluoConfig) {
+      this.clazz = clazz;
+      this.fluoConfig = fluoConfig;
+    }
+
+    @Override
+    protected void configure() {
+      requestStaticInjection(clazz);
+      bind(FluoConfiguration.class).toProvider((Provider<FluoConfiguration>) () -> fluoConfig);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.err.println("Usage: FluoExec <connectionPropsPath> <applicationName> <class> args...");
+      System.exit(-1);
+    }
+    final String connectionPropsPath = args[0];
+    final String applicationName = args[1];
+    final String className = args[2];
+    Objects.requireNonNull(connectionPropsPath);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    FluoConfiguration fluoConfig = new FluoConfiguration(connectionPropsFile);
+    fluoConfig.setApplicationName(applicationName);
+    CommandUtil.verifyAppInitialized(fluoConfig);
+    fluoConfig = FluoAdminImpl.mergeZookeeperConfig(fluoConfig);
+
+    Arrays.copyOfRange(args, 3, args.length);
+
+    Class<?> clazz = Class.forName(className);
+
+    // inject fluo configuration
+    Guice.createInjector(new FluoConfigModule(clazz, fluoConfig));
+
+    Method method = clazz.getMethod("main", String[].class);
+    method.invoke(null, (Object) Arrays.copyOfRange(args, 3, args.length));
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoGetJars.java b/modules/command/src/main/java/org/apache/fluo/command/FluoGetJars.java
new file mode 100644
index 0000000..a866515
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoGetJars.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoGetJars {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoGetJars.class);
+
+  public static void main(String[] args) {
+    if (args.length != 3) {
+      System.err
+          .println("Usage: FluoGetJars <connectionPropsPath> <applicationName> <downloadPath>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    String applicationName = args[1];
+    String downloadPath = args[2];
+    Objects.requireNonNull(connectionPropsPath);
+    Objects.requireNonNull(applicationName);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+    config.setApplicationName(applicationName);
+    CommandUtil.verifyAppInitialized(config);
+
+    config = FluoAdminImpl.mergeZookeeperConfig(config);
+
+    if (config.getObserverJarsUrl().isEmpty()) {
+      log.info("No observer jars found for the '{}' Fluo application!", applicationName);
+      return;
+    }
+
+    try {
+      if (config.getObserverJarsUrl().startsWith("hdfs://")) {
+        FileSystem fs = FileSystem.get(new URI(config.getDfsRoot()), new Configuration());
+        File downloadPathFile = new File(downloadPath);
+        if (downloadPathFile.exists()) {
+          FileUtils.deleteDirectory(downloadPathFile);
+        }
+        fs.copyToLocalFile(new Path(config.getObserverJarsUrl()), new Path(downloadPath));
+      } else {
+        log.error("Unsupported url prefix for {}={}", FluoConfiguration.OBSERVER_JARS_URL_PROP,
+            config.getObserverJarsUrl());
+      }
+    } catch (Exception e) {
+      log.error("", e);
+    }
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoGetProp.java b/modules/command/src/main/java/org/apache/fluo/command/FluoGetProp.java
new file mode 100644
index 0000000..61e3254
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoGetProp.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoGetProp {
+
+  public static void main(String[] args) {
+    if (args.length != 3) {
+      System.err
+          .println("Usage: FluoGetProps <connectionPropsPath> <applicationPropsPath> <property>");
+      System.exit(-1);
+    }
+    final String connectionPropsPath = args[0];
+    final String applicationPropsPath = args[1];
+    final String prop = args[2];
+    Objects.requireNonNull(connectionPropsPath);
+    Objects.requireNonNull(applicationPropsPath);
+    File connectionPropsFile = new File(connectionPropsPath);
+    File applicationPropsFile = new File(applicationPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+    Preconditions.checkArgument(applicationPropsFile.exists(), applicationPropsPath
+        + " does not exist");
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+    config.load(applicationPropsFile);
+
+    if (config.containsKey(prop)) {
+      System.out.println(config.getString(prop));
+    }
+  }
+}
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
similarity index 74%
copy from modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
copy to modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
index fbfb91c..d3efc4f 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
@@ -13,29 +13,26 @@
  * the License.
  */
 
-package org.apache.fluo.cluster.runner;
+package org.apache.fluo.command;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Objects;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
+import com.google.common.base.Preconditions;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.client.FluoAdminImpl;
 
-/**
- * For running Fluo app on cluster
- */
-public abstract class ClusterAppRunner extends AppRunner {
-
-  public ClusterAppRunner(String scriptName) {
-    super(scriptName);
-  }
+public class FluoInit {
 
-  public class InitOptions {
+  public static class InitOptions {
 
     @Parameter(names = {"-f", "--force"},
         description = "Skip all prompts and clears Zookeeper and Accumulo table.  Equivalent to "
@@ -52,26 +49,26 @@ public abstract class ClusterAppRunner extends AppRunner {
     private boolean update;
 
     @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help")
-    public boolean help;
+    boolean help;
 
-    public boolean getForce() {
+    boolean getForce() {
       return force;
     }
 
-    public boolean getClearTable() {
+    boolean getClearTable() {
       return clearTable;
     }
 
-    public boolean getClearZookeeper() {
+    boolean getClearZookeeper() {
       return clearZookeeper;
     }
 
-    public boolean getUpdate() {
+    boolean getUpdate() {
       return update;
     }
   }
 
-  public static boolean readYes() {
+  private static boolean readYes() {
     String input = "unk";
     while (true) {
       BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
@@ -90,12 +87,26 @@ public abstract class ClusterAppRunner extends AppRunner {
     }
   }
 
-  public void init(FluoConfiguration config, String propsPath, String[] args) {
+  public static void main(String[] args) {
+    if (args.length < 1) {
+      System.err
+          .println("Usage: FluoInit <connectionPropsPath> <appName> <applicationPropsPath> userArgs...");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    Objects.requireNonNull(connectionPropsPath);
+    Preconditions.checkArgument(!connectionPropsPath.isEmpty(), "<connectionPropsPath> is empty");
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    String[] userArgs = Arrays.copyOfRange(args, 3, args.length);
+
     InitOptions commandOpts = new InitOptions();
     JCommander jcommand = new JCommander(commandOpts);
-    jcommand.setProgramName("fluo init");
+    jcommand.setProgramName("fluo init <app> <appProps>");
     try {
-      jcommand.parse(args);
+      jcommand.parse(userArgs);
     } catch (ParameterException e) {
       System.err.println(e.getMessage());
       jcommand.usage();
@@ -104,23 +115,29 @@ public abstract class ClusterAppRunner extends AppRunner {
 
     if (commandOpts.help) {
       jcommand.usage();
-      System.exit(0);
+      System.exit(1);
     }
 
+    String applicationName = args[1];
+    Objects.requireNonNull(applicationName);
+    String applicationPropsPath = args[2];
+    Objects.requireNonNull(applicationPropsPath);
+    File applicationPropsFile = new File(applicationPropsPath);
+    Preconditions.checkArgument(applicationPropsFile.exists(), applicationPropsPath
+        + " does not exist");
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+    config.load(applicationPropsFile);
+    config.setApplicationName(applicationName);
+
     if (!config.hasRequiredAdminProps()) {
-      System.err.println("Error - Required properties are not set in " + propsPath);
+      System.err.println("Error - Required properties are not set in " + applicationPropsPath);
       System.exit(-1);
     }
     try {
       config.validate();
-    } catch (IllegalArgumentException e) {
-      System.err.println("Error - Invalid fluo.properties (" + propsPath + ") due to "
-          + e.getMessage());
-      System.exit(-1);
     } catch (Exception e) {
-      System.err.println("Error - Invalid fluo.properties (" + propsPath + ") due to "
-          + e.getMessage());
-      e.printStackTrace();
+      System.err.println("Error - Invalid configuration due to " + e.getMessage());
       System.exit(-1);
     }
 
@@ -137,7 +154,7 @@ public abstract class ClusterAppRunner extends AppRunner {
 
       if (commandOpts.getUpdate()) {
         System.out.println("Updating configuration for the Fluo '" + config.getApplicationName()
-            + "' application in Zookeeper using " + propsPath);
+            + "' application in Zookeeper using " + applicationPropsPath);
         admin.updateSharedConfig();
         System.out.println("Update is complete.");
         System.exit(0);
@@ -176,7 +193,7 @@ public abstract class ClusterAppRunner extends AppRunner {
       }
 
       System.out.println("Initializing Fluo '" + config.getApplicationName()
-          + "' application using " + propsPath);
+          + "' application using " + applicationPropsPath);
       try {
         admin.initialize(initOpts);
       } catch (Exception e) {
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoList.java b/modules/command/src/main/java/org/apache/fluo/command/FluoList.java
new file mode 100644
index 0000000..b389d03
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoList.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.fluo.core.util.CuratorUtil;
+
+public class FluoList {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      System.err.println("Usage: FluoList <connectionPropsPath>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    Objects.requireNonNull(connectionPropsPath);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+
+    try (CuratorFramework curator = CuratorUtil.newFluoCurator(config)) {
+      curator.start();
+
+      if (curator.checkExists().forPath("/") == null) {
+        System.out.println("Fluo instance (" + config.getInstanceZookeepers() + ") has not been "
+            + "created yet in Zookeeper.  It will be created when the first Fluo application is "
+            + "initialized for this instance.");
+        return;
+      }
+      List<String> children = curator.getChildren().forPath("/");
+      if (children.isEmpty()) {
+        System.out.println("Fluo instance (" + config.getInstanceZookeepers() + ") does not "
+            + "contain any Fluo applications.");
+        return;
+      }
+      Collections.sort(children);
+
+      System.out.println("Fluo instance (" + config.getInstanceZookeepers() + ") contains "
+          + children.size() + " application(s)\n");
+      System.out.println("Application     Status");
+      System.out.println("-----------     ------");
+      for (String path : children) {
+        FluoConfiguration appConfig = new FluoConfiguration(config);
+        appConfig.setApplicationName(path);
+        String state = "STOPPED";
+        if (FluoAdminImpl.oracleExists(appConfig)) {
+          state = "RUNNING";
+        }
+        System.out.format("%-15s %-11s\n", path, state);
+      }
+    }
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java b/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java
new file mode 100644
index 0000000..febaf6d
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoOracle {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoOracle.class);
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: FluoOracle <connectionPropsPath> <applicationName>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    String applicationName = args[1];
+    Objects.requireNonNull(connectionPropsPath);
+    Objects.requireNonNull(applicationName);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    try {
+      FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+      config.setApplicationName(applicationName);
+      CommandUtil.verifyAppInitialized(config);
+      org.apache.fluo.api.service.FluoOracle oracle = FluoFactory.newOracle(config);
+      oracle.start();
+      while (true) {
+        UtilWaitThread.sleep(10000);
+      }
+    } catch (Exception e) {
+      log.error("Exception running FluoOracle: ", e);
+    }
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
new file mode 100644
index 0000000..432ffed
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoScan.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.ScanUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FluoScan {
+
+  public static class ScanOptions {
+
+    @Parameter(names = "-s", description = "Start row (inclusive) of scan")
+    private String startRow;
+
+    @Parameter(names = "-e", description = "End row (inclusive) of scan")
+    private String endRow;
+
+    @Parameter(names = "-c", description = "Columns of scan in comma separated format: "
+        + "<<columnfamily>[:<columnqualifier>]{,<columnfamily>[:<columnqualifier>]}> ")
+    private List<String> columns;
+
+    @Parameter(names = "-r", description = "Exact row to scan")
+    private String exactRow;
+
+    @Parameter(names = "-p", description = "Row prefix to scan")
+    private String rowPrefix;
+
+    @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help")
+    public boolean help;
+
+    @Parameter(names = {"-esc", "--escape-non-ascii"}, help = true,
+        description = "Hex encode non ascii bytes", arity = 1)
+    public boolean hexEncNonAscii = true;
+
+    @Parameter(
+        names = "--raw",
+        help = true,
+        description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo "
+            + "internal schema, making it easier to comprehend.")
+    public boolean scanAccumuloTable = false;
+
+    public String getStartRow() {
+      return startRow;
+    }
+
+    public String getEndRow() {
+      return endRow;
+    }
+
+    public String getExactRow() {
+      return exactRow;
+    }
+
+    public String getRowPrefix() {
+      return rowPrefix;
+    }
+
+    public List<String> getColumns() {
+      if (columns == null) {
+        return Collections.emptyList();
+      }
+      return columns;
+    }
+
+    public ScanUtil.ScanOpts getScanOpts() {
+      return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, help,
+          hexEncNonAscii, scanAccumuloTable);
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      System.err.println("Usage: FluoScan <connectionPropsPath> <appName> userArgs...");
+      System.exit(-1);
+    }
+    final String connectionPropsPath = args[0];
+    final String applicationName = args[1];
+    Objects.requireNonNull(connectionPropsPath);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    String[] userArgs = Arrays.copyOfRange(args, 2, args.length);
+
+    Logger.getRootLogger().setLevel(Level.ERROR);
+    Logger.getLogger("org.apache.fluo").setLevel(Level.ERROR);
+
+    ScanOptions options = new ScanOptions();
+    JCommander jcommand = new JCommander(options);
+    jcommand.setProgramName("fluo scan <app>");
+    try {
+      jcommand.parse(userArgs);
+    } catch (ParameterException e) {
+      System.err.println(e.getMessage());
+      jcommand.usage();
+      System.exit(-1);
+    }
+
+    if (options.help) {
+      jcommand.usage();
+      System.exit(0);
+    }
+
+    FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+    config.setApplicationName(applicationName);
+    CommandUtil.verifyAppRunning(config);
+
+    if (options.scanAccumuloTable) {
+      ScanUtil.scanAccumulo(options.getScanOpts(), config);
+    } else {
+      ScanUtil.scanFluo(options.getScanOpts(), config);
+    }
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
new file mode 100644
index 0000000..33fae21
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.impl.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoWait {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoWait.class);
+  private static final long MIN_SLEEP_SEC = 10;
+  private static final long MAX_SLEEP_SEC = 300;
+
+  private static long calculateSleep(long notifyCount) {
+    long sleep = notifyCount / 500;
+    if (sleep < MIN_SLEEP_SEC) {
+      return MIN_SLEEP_SEC;
+    } else if (sleep > MAX_SLEEP_SEC) {
+      return MAX_SLEEP_SEC;
+    }
+    return sleep;
+  }
+
+  @VisibleForTesting
+  public static long countNotifications(Environment env) {
+    Scanner scanner = null;
+    try {
+      scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
+    } catch (TableNotFoundException e) {
+      log.error("An exception was thrown -", e);
+      throw new FluoException(e);
+    }
+
+    Notification.configureScanner(scanner);
+
+    return Iterables.size(scanner);
+  }
+
+  public static void waitUntilFinished(FluoConfiguration config) {
+    try (Environment env = new Environment(config)) {
+      log.info("The wait command will exit when all notifications are processed");
+      while (true) {
+        long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
+        long ntfyCount = countNotifications(env);
+        long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
+        if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
+          log.info("All processing has finished!");
+          break;
+        }
+
+        try {
+          long sleepSec = calculateSleep(ntfyCount);
+          log.info("{} notifications are still outstanding.  Will try again in {} seconds...",
+              ntfyCount, sleepSec);
+          Thread.sleep(1000 * sleepSec);
+        } catch (InterruptedException e) {
+          log.error("Sleep was interrupted!  Exiting...");
+          System.exit(-1);
+        }
+      }
+    } catch (FluoException e) {
+      log.error(e.getMessage());
+      System.exit(-1);
+    } catch (Exception e) {
+      log.error("An exception was thrown -", e);
+      System.exit(-1);
+    }
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: FluoWait <connectionPropsPath> <applicationName>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    String applicationName = args[1];
+    Objects.requireNonNull(connectionPropsPath);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    FluoConfiguration fluoConfig = new FluoConfiguration(connectionPropsFile);
+    fluoConfig.setApplicationName(applicationName);
+    CommandUtil.verifyAppRunning(fluoConfig);
+    fluoConfig = FluoAdminImpl.mergeZookeeperConfig(fluoConfig);
+
+    waitUntilFinished(fluoConfig);
+  }
+}
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoWorker.java b/modules/command/src/main/java/org/apache/fluo/command/FluoWorker.java
new file mode 100644
index 0000000..bb64446
--- /dev/null
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWorker.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.command;
+
+import java.io.File;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoWorker {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoWorker.class);
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: FluoWorker <connectionPropsPath> <applicationName>");
+      System.exit(-1);
+    }
+    String connectionPropsPath = args[0];
+    String applicationName = args[1];
+    Objects.requireNonNull(connectionPropsPath);
+    Objects.requireNonNull(applicationName);
+    File connectionPropsFile = new File(connectionPropsPath);
+    Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+        + " does not exist");
+
+    try {
+      FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
+      config.setApplicationName(applicationName);
+      CommandUtil.verifyAppInitialized(config);
+      org.apache.fluo.api.service.FluoWorker worker = FluoFactory.newWorker(config);
+      worker.start();
+      while (true) {
+        UtilWaitThread.sleep(10000);
+      }
+    } catch (Exception e) {
+      log.error("Exception running FluoWorker: ", e);
+    }
+  }
+}
diff --git a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
similarity index 92%
rename from modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
rename to modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
index a84e8ea..810d689 100644
--- a/modules/cluster/src/test/java/org/apache/fluo/cluster/runner/ScanTest.java
+++ b/modules/command/src/test/java/org/apache/fluo/command/ScanTest.java
@@ -13,26 +13,28 @@
  * the License.
  */
 
-package org.apache.fluo.cluster.runner;
+package org.apache.fluo.command;
 
 import com.beust.jcommander.JCommander;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.core.impl.SnapshotScanner;
+import org.apache.fluo.core.util.ScanUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
- * Unit test for Scan command
+ * Unit test for ScanUtil
  */
 public class ScanTest {
 
   private SnapshotScanner.Opts parseArgs(String args) {
-    ScanOptions options = new ScanOptions();
+    FluoScan.ScanOptions options = new FluoScan.ScanOptions();
     JCommander jcommand = new JCommander(options);
     jcommand.parse(args.split(" "));
-    return new SnapshotScanner.Opts(AppRunner.getSpan(options), AppRunner.getColumns(options));
+    ScanUtil.ScanOpts opts = options.getScanOpts();
+    return new SnapshotScanner.Opts(ScanUtil.getSpan(opts), ScanUtil.getColumns(opts));
   }
 
   @Test
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 62914e0..8300863 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -15,7 +15,13 @@
 
 package org.apache.fluo.core.client;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -25,7 +31,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -39,11 +48,16 @@ import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.observer.ObserverUtil;
 import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -64,9 +78,7 @@ public class FluoAdminImpl implements FluoAdmin {
 
   public FluoAdminImpl(FluoConfiguration config) {
     this.config = config;
-    if (!config.hasRequiredAdminProps()) {
-      throw new IllegalArgumentException("Admin configuration is missing required properties");
-    }
+
 
     appRootDir = ZookeeperUtil.parseRoot(config.getAppZookeepers());
     rootCurator = CuratorUtil.newRootFluoCurator(config);
@@ -84,10 +96,17 @@ public class FluoAdminImpl implements FluoAdmin {
   @Override
   public void initialize(InitializationOptions opts) throws AlreadyInitializedException,
       TableExistsException {
+    if (!config.hasRequiredAdminProps()) {
+      throw new IllegalArgumentException("Admin configuration is missing required properties");
+    }
     Preconditions.checkArgument(!ZookeeperUtil.parseRoot(config.getInstanceZookeepers())
-        .equals("/"), "The Zookeeper connection string (set by 'fluo.client.zookeeper.connect') "
+        .equals("/"), "The Zookeeper connection string (set by 'fluo.connection.zookeepers') "
         + " must have a chroot suffix.");
 
+    Preconditions.checkArgument(config.getObserverJarsUrl().isEmpty()
+        || config.getObserverInitDir().isEmpty(),
+        "Only one of 'fluo.observer.init.dir' and 'fluo.observer.jars.url' can be set");
+
     if (zookeeperInitialized() && !opts.getClearZookeeper()) {
       throw new AlreadyInitializedException("Fluo application already initialized at "
           + config.getAppZookeepers());
@@ -128,19 +147,38 @@ public class FluoAdminImpl implements FluoAdmin {
 
     try {
       initialize(conn);
-      updateSharedConfig();
 
-      if (!config.getAccumuloClasspath().trim().isEmpty()) {
-        // TODO add fluo version to context name to make it unique
-        String contextName = "fluo";
+      String accumuloJars;
+      if (!config.getAccumuloJars().trim().isEmpty()) {
+        accumuloJars = config.getAccumuloJars().trim();
+      } else {
+        accumuloJars = getJarsFromClasspath();
+      }
+
+      String accumuloClasspath;
+      if (!accumuloJars.isEmpty()) {
+        accumuloClasspath = copyJarsToDfs(accumuloJars, "lib/accumulo");
+      } else {
+        accumuloClasspath = config.getAccumuloClasspath().trim();
+      }
+
+      if (!accumuloClasspath.isEmpty()) {
+        String contextName = "fluo-" + config.getApplicationName();
         conn.instanceOperations().setProperty(
-            AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + "fluo", config.getAccumuloClasspath());
+            AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
         conn.tableOperations().setProperty(config.getAccumuloTable(),
             AccumuloProps.TABLE_CLASSPATH, contextName);
       }
 
+      if (config.getObserverJarsUrl().isEmpty() && !config.getObserverInitDir().trim().isEmpty()) {
+        String observerUrl = copyDirToDfs(config.getObserverInitDir().trim(), "lib/observers");
+        config.setObserverJarsUrl(observerUrl);
+      }
+
       conn.tableOperations().setProperty(config.getAccumuloTable(),
           AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
+
+      updateSharedConfig();
     } catch (NodeExistsException nee) {
       throw new AlreadyInitializedException();
     } catch (Exception e) {
@@ -202,14 +240,14 @@ public class FluoAdminImpl implements FluoAdmin {
 
   @Override
   public void updateSharedConfig() {
-
+    if (!config.hasRequiredAdminProps()) {
+      throw new IllegalArgumentException("Admin configuration is missing required properties");
+    }
     Properties sharedProps = new Properties();
     Iterator<String> iter = config.getKeys();
     while (iter.hasNext()) {
       String key = iter.next();
-      if (key.equals(FluoConfiguration.TRANSACTION_ROLLBACK_TIME_PROP)) {
-        sharedProps.setProperty(key, Long.toString(config.getLong(key)));
-      } else if (key.startsWith(FluoConfiguration.APP_PREFIX)) {
+      if (!key.startsWith(FluoConfiguration.CONNECTION_PREFIX)) {
         sharedProps.setProperty(key, config.getRawString(key));
       }
     }
@@ -229,6 +267,111 @@ public class FluoAdminImpl implements FluoAdmin {
   }
 
   @Override
+  public SimpleConfiguration getConnectionConfig() {
+    return new SimpleConfiguration(config);
+  }
+
+  @Override
+  public SimpleConfiguration getApplicationConfig() {
+    return getZookeeperConfig(config);
+  }
+
+  private String copyDirToDfs(String srcDir, String destDir) {
+    return copyDirToDfs(config.getDfsRoot(), config.getApplicationName(), srcDir, destDir);
+  }
+
+  @VisibleForTesting
+  public static String copyDirToDfs(String dfsRoot, String appName, String srcDir, String destDir) {
+    String dfsAppRoot = dfsRoot + "/" + appName;
+    String dfsDestDir = dfsAppRoot + "/" + destDir;
+
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(new URI(dfsRoot), new Configuration());
+      fs.delete(new Path(dfsDestDir), true);
+      fs.mkdirs(new Path(dfsAppRoot));
+      fs.copyFromLocalFile(new Path(srcDir), new Path(dfsDestDir));
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+    return dfsDestDir;
+  }
+
+  private String copyJarsToDfs(String jars, String destDir) {
+    String dfsAppRoot = config.getDfsRoot() + "/" + config.getApplicationName();
+    String dfsDestDir = dfsAppRoot + "/" + destDir;
+
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(new URI(config.getDfsRoot()), new Configuration());
+      fs.mkdirs(new Path(dfsDestDir));
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+
+    StringBuilder classpath = new StringBuilder();
+    for (String jarPath : jars.split(",")) {
+      File jarFile = new File(jarPath);
+      String jarName = jarFile.getName();
+      try {
+        fs.copyFromLocalFile(new Path(jarPath), new Path(dfsDestDir));
+      } catch (IOException e) {
+        logger.error("Failed to copy file {} to DFS directory {}", jarPath, dfsDestDir);
+        throw new IllegalStateException(e);
+      }
+      if (classpath.length() != 0) {
+        classpath.append(",");
+      }
+      classpath.append(dfsDestDir + "/" + jarName);
+    }
+    return classpath.toString();
+  }
+
+  public static boolean isInitialized(FluoConfiguration config) {
+    try (CuratorFramework rootCurator = CuratorUtil.newRootFluoCurator(config)) {
+      rootCurator.start();
+      String appRootDir = ZookeeperUtil.parseRoot(config.getAppZookeepers());
+      return rootCurator.checkExists().forPath(appRootDir) != null;
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static FluoConfiguration mergeZookeeperConfig(FluoConfiguration config) {
+    SimpleConfiguration zooConfig = getZookeeperConfig(config);
+    FluoConfiguration copy = new FluoConfiguration(config);
+    for (Map.Entry<String, String> entry : zooConfig.toMap().entrySet()) {
+      copy.setProperty(entry.getKey(), entry.getValue());
+    }
+    return copy;
+  }
+
+  public static SimpleConfiguration getZookeeperConfig(FluoConfiguration config) {
+    if (!isInitialized(config)) {
+      throw new IllegalStateException("Fluo Application '" + config.getApplicationName()
+          + "' has not been initialized");
+    }
+
+    SimpleConfiguration zooConfig = new SimpleConfiguration();
+
+    try (CuratorFramework curator = CuratorUtil.newAppCurator(config)) {
+      curator.start();
+
+      ByteArrayInputStream bais =
+          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
+      Properties sharedProps = new Properties();
+      sharedProps.load(bais);
+
+      for (String prop : sharedProps.stringPropertyNames()) {
+        zooConfig.setProperty(prop, sharedProps.getProperty(prop));
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+    return zooConfig;
+  }
+
+  @Override
   public void close() {
     rootCurator.close();
     if (appCurator != null) {
@@ -236,6 +379,38 @@ public class FluoAdminImpl implements FluoAdmin {
     }
   }
 
+  private String getJarsFromClasspath() {
+    StringBuilder jars = new StringBuilder();
+    ClassLoader cl = FluoAdminImpl.class.getClassLoader();
+    URL[] urls = ((URLClassLoader) cl).getURLs();
+
+    String regex =
+        config.getString(FluoConfigurationImpl.ACCUMULO_JARS_REGEX_PROP,
+            FluoConfigurationImpl.ACCUMULO_JARS_REGEX_DEFAULT);
+    Pattern pattern = Pattern.compile(regex);
+
+    for (URL url : urls) {
+      String jarName = new File(url.getFile()).getName();
+      if (pattern.matcher(jarName).matches()) {
+        if (jars.length() != 0) {
+          jars.append(",");
+        }
+        jars.append(url.getFile());
+      }
+    }
+    return jars.toString();
+  }
+
+  public static boolean oracleExists(FluoConfiguration config) {
+    try (CuratorFramework curator = CuratorUtil.newAppCurator(config)) {
+      curator.start();
+      return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
+          && !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public boolean oracleExists() {
     CuratorFramework curator = getAppCurator();
     try {
@@ -255,6 +430,9 @@ public class FluoAdminImpl implements FluoAdmin {
   }
 
   public boolean accumuloTableExists() {
+    if (!config.hasRequiredAdminProps()) {
+      throw new IllegalArgumentException("Admin configuration is missing required properties");
+    }
     Connector conn = AccumuloUtil.getConnector(config);
     return conn.tableOperations().exists(config.getAccumuloTable());
   }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
index 11b99e1..c1cf6d8 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoClientImpl.java
@@ -15,8 +15,11 @@
 
 package org.apache.fluo.core.client;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
@@ -49,19 +52,17 @@ public class FluoClientImpl implements FluoClient {
         + reporterCounter.getAndIncrement());
   }
 
-  public FluoClientImpl(FluoConfiguration config) {
-    this.config = config;
-    if (!config.hasRequiredClientProps()) {
-      String msg = "Client configuration is missing required properties";
-      log.error(msg);
-      throw new IllegalArgumentException(msg);
-    }
+  public FluoClientImpl(FluoConfiguration connConfig) {
+    Objects.requireNonNull(connConfig);
+    Preconditions.checkArgument(connConfig.hasRequiredConnectionProps(),
+        "missing required connection properties");
+    config = FluoAdminImpl.mergeZookeeperConfig(connConfig);
+    Preconditions.checkArgument(config.hasRequiredClientProps());
     try {
       this.env = new Environment(config);
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
-
     reporter = setupReporters(env, "client", reporterCounter);
   }
 
@@ -119,4 +120,9 @@ public class FluoClientImpl implements FluoClient {
       throw new RuntimeException(e);
     }
   }
+
+  @VisibleForTesting
+  public FluoConfiguration getSharedConfiguration() {
+    return config;
+  }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 4ad6ec9..2bfdbd7 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -15,11 +15,9 @@
 
 package org.apache.fluo.core.impl;
 
-import java.io.ByteArrayInputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
-import java.util.Properties;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.client.Connector;
@@ -30,6 +28,7 @@ import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.metrics.MetricNames;
 import org.apache.fluo.core.metrics.MetricsReporterImpl;
 import org.apache.fluo.core.observer.RegisteredObservers;
@@ -125,19 +124,10 @@ public class Environment implements AutoCloseable {
 
       observers = ObserverUtil.load(curator);
 
-      ByteArrayInputStream bais =
-          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
-      Properties sharedProps = new Properties();
-      sharedProps.load(bais);
-
-      FluoConfiguration tmpConfig = new FluoConfiguration();
-      for (String prop : sharedProps.stringPropertyNames()) {
-        config.setProperty(prop, sharedProps.getProperty(prop));
-        tmpConfig.setProperty(prop, sharedProps.getProperty(prop));
-      }
+      config = FluoAdminImpl.mergeZookeeperConfig(config);
 
       // make sure not to include config passed to env, only want config from zookeeper
-      appConfig = tmpConfig.getAppConfiguration();
+      appConfig = config.getAppConfiguration();
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
index 9af6feb..e3805dc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
@@ -38,6 +38,9 @@ public class FluoConfigurationImpl {
       + ".worker.finder.maxSleep";
   public static final int NTFY_FINDER_MAX_SLEEP_TIME_DEFAULT = 5 * 60 * 1000;
 
+  public static final String ACCUMULO_JARS_REGEX_PROP = FLUO_IMPL_PREFIX + ".accumulo.jars.regex";
+  public static final String ACCUMULO_JARS_REGEX_DEFAULT = "^fluo-(api|accumulo).*";
+
   // Time period that each client will update ZK with their oldest active timestamp
   // If period is too short, Zookeeper may be overloaded. If too long, garbage collection
   // may keep older versions of table data unnecessarily.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
index 6f814b3..c2fc35a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -86,7 +86,7 @@ public class ObserverStoreV1 implements ObserverStore {
       logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
           oc.toMap());
       try {
-        observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
+        observer.init(new ObserverContext(config.getAppConfiguration(), oc));
       } catch (Exception e) {
         throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
             e);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
index c0f5f11..e04b174 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.api.service.FluoOracle;
+import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.metrics.ReporterUtil;
 import org.apache.fluo.core.util.CuratorUtil;
@@ -40,17 +41,18 @@ public class FluoOracleImpl implements FluoOracle {
   private OracleServer oracleServer;
   private NodeCache appIdCache;
 
-  public FluoOracleImpl(FluoConfiguration config) {
-    Objects.requireNonNull(config);
+  public FluoOracleImpl(FluoConfiguration connConfig) {
+    Objects.requireNonNull(connConfig);
+    Preconditions.checkArgument(connConfig.hasRequiredConnectionProps());
+    config = FluoAdminImpl.mergeZookeeperConfig(connConfig);
     Preconditions.checkArgument(config.hasRequiredOracleProps());
     // any client in oracle should retry forever
-    config.setClientRetryTimeout(-1);
+    config.setConnectionRetryTimeout(-1);
     try {
       config.validate();
     } catch (Exception e) {
       throw new IllegalArgumentException("Invalid FluoConfiguration", e);
     }
-    this.config = config;
   }
 
   @Override
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index d8eaed4..4afb0a1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -365,7 +365,7 @@ public class OracleClient implements AutoCloseable {
     TimeRequest tr = new TimeRequest();
     try {
       queue.put(tr);
-      int timeout = env.getConfiguration().getClientRetryTimeout();
+      int timeout = env.getConfiguration().getConnectionRetryTimeout();
       if (timeout < 0) {
         long waitPeriod = 1;
         long waitTotal = 0;
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
similarity index 55%
copy from modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
copy to modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index a984469..a9f2e22 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -13,24 +13,16 @@
  * the License.
  */
 
-package org.apache.fluo.cluster.runner;
+package org.apache.fluo.core.util;
 
-import java.lang.reflect.Method;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 
-import javax.inject.Provider;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.ParameterException;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.accumulo.format.FluoFormatter;
 import org.apache.fluo.api.client.FluoClient;
@@ -43,32 +35,10 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.cluster.util.FluoYarnConfig;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.Notification;
-import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ByteUtil;
-import org.apache.fluo.core.util.Hex;
-import org.apache.fluo.core.util.SpanUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for running a Fluo application
- */
-public abstract class AppRunner {
 
-  private static final Logger log = LoggerFactory.getLogger(AppRunner.class);
-  private static final long MIN_SLEEP_SEC = 10;
-  private static final long MAX_SLEEP_SEC = 300;
-
-  private String scriptName;
-
-  public AppRunner(String scriptName) {
-    this.scriptName = scriptName;
-  }
+public class ScanUtil {
 
-  public static Span getSpan(ScanOptions options) {
+  public static Span getSpan(ScanOpts options) {
     Span span = new Span();
     if ((options.getExactRow() != null)
         && ((options.getStartRow() != null) || (options.getEndRow() != null) || (options
@@ -102,7 +72,7 @@ public abstract class AppRunner {
     return span;
   }
 
-  public static Collection<Column> getColumns(ScanOptions options) {
+  public static Collection<Column> getColumns(ScanOpts options) {
     Collection<Column> columns = new HashSet<>();
 
     // configure columns of scanner
@@ -121,33 +91,7 @@ public abstract class AppRunner {
     return columns;
   }
 
-
-
-  public long scan(FluoConfiguration config, String[] args) {
-    ScanOptions options = new ScanOptions();
-    JCommander jcommand = new JCommander(options);
-    jcommand.setProgramName(scriptName + " scan <app>");
-    try {
-      jcommand.parse(args);
-    } catch (ParameterException e) {
-      System.err.println(e.getMessage());
-      jcommand.usage();
-      System.exit(-1);
-    }
-
-    if (options.help) {
-      jcommand.usage();
-      System.exit(0);
-    }
-
-    if (options.scanAccumuloTable) {
-      return scanAccumulo(options, config);
-    } else {
-      return scanFluo(options, config);
-    }
-  }
-
-  private long scanFluo(ScanOptions options, FluoConfiguration sConfig) {
+  public static void scanFluo(ScanOpts options, FluoConfiguration sConfig) {
     System.out.println("Scanning snapshot of data in Fluo '" + sConfig.getApplicationName()
         + "' application.");
 
@@ -199,10 +143,9 @@ public abstract class AppRunner {
         System.out.println("Scan failed - " + e.getMessage());
       }
     }
-    return entriesFound;
   }
 
-  private long scanAccumulo(ScanOptions options, FluoConfiguration sConfig) {
+  public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig) {
     System.out.println("Scanning data in Accumulo directly for '" + sConfig.getApplicationName()
         + "' application.");
 
@@ -218,8 +161,6 @@ public abstract class AppRunner {
       System.exit(-1);
     }
 
-    long entriesFound = 0;
-
     try {
       Scanner scanner = conn.createScanner(sConfig.getAccumuloTable(), Authorizations.EMPTY);
       scanner.setRange(SpanUtil.toRange(span));
@@ -237,103 +178,53 @@ public abstract class AppRunner {
       }
     } catch (Exception e) {
       System.out.println("Scan failed - " + e.getMessage());
-      entriesFound++;
     }
-
-    return entriesFound;
   }
 
-  private long calculateSleep(long notifyCount, long numWorkers) {
-    long sleep = notifyCount / numWorkers / 100;
-    if (sleep < MIN_SLEEP_SEC) {
-      return MIN_SLEEP_SEC;
-    } else if (sleep > MAX_SLEEP_SEC) {
-      return MAX_SLEEP_SEC;
+  public static class ScanOpts {
+
+    private String startRow;
+    private String endRow;
+    private List<String> columns;
+    private String exactRow;
+    private String rowPrefix;
+    public boolean help;
+    public boolean hexEncNonAscii = true;
+    public boolean scanAccumuloTable = false;
+
+    public ScanOpts(String startRow, String endRow, List<String> columns, String exactRow,
+        String rowPrefix, boolean help, boolean hexEncNonAscii, boolean scanAccumuloTable) {
+      this.startRow = startRow;
+      this.endRow = endRow;
+      this.columns = columns;
+      this.exactRow = exactRow;
+      this.rowPrefix = rowPrefix;
+      this.help = help;
+      this.hexEncNonAscii = hexEncNonAscii;
+      this.scanAccumuloTable = scanAccumuloTable;
     }
-    return sleep;
-  }
 
-  @VisibleForTesting
-  public long countNotifications(Environment env) {
-    Scanner scanner = null;
-    try {
-      scanner = env.getConnector().createScanner(env.getTable(), env.getAuthorizations());
-    } catch (TableNotFoundException e) {
-      log.error("An exception was thrown -", e);
-      throw new FluoException(e);
+    public String getStartRow() {
+      return startRow;
     }
 
-    Notification.configureScanner(scanner);
-
-    return Iterables.size(scanner);
-  }
-
-  public void waitUntilFinished(FluoConfiguration config) {
-    try (Environment env = new Environment(config)) {
-      log.info("The wait command will exit when all notifications are processed");
-      while (true) {
-        long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        long ntfyCount = countNotifications(env);
-        long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
-          log.info("All processing has finished!");
-          break;
-        }
-
-        try {
-          long sleepSec = calculateSleep(ntfyCount, FluoYarnConfig.getWorkerInstances(config));
-          log.info("{} notifications are still outstanding.  Will try again in {} seconds...",
-              ntfyCount, sleepSec);
-          Thread.sleep(1000 * sleepSec);
-        } catch (InterruptedException e) {
-          log.error("Sleep was interrupted!  Exiting...");
-          System.exit(-1);
-        }
-      }
-    } catch (FluoException e) {
-      log.error(e.getMessage());
-      System.exit(-1);
-    } catch (Exception e) {
-      log.error("An exception was thrown -", e);
-      System.exit(-1);
+    public String getEndRow() {
+      return endRow;
     }
-  }
-
-  private static class FluoConfigModule extends AbstractModule {
-
-    private Class<?> clazz;
-    private FluoConfiguration fluoConfig;
 
-    FluoConfigModule(Class<?> clazz, FluoConfiguration fluoConfig) {
-      this.clazz = clazz;
-      this.fluoConfig = fluoConfig;
+    public String getExactRow() {
+      return exactRow;
     }
 
-    @Override
-    protected void configure() {
-      requestStaticInjection(clazz);
-      bind(FluoConfiguration.class).toProvider(new Provider<FluoConfiguration>() {
-        @Override
-        public FluoConfiguration get() {
-          // TODO Auto-generated method stub
-          return fluoConfig;
-        }
-      });
+    public String getRowPrefix() {
+      return rowPrefix;
     }
 
-  }
-
-  public void exec(FluoConfiguration fluoConfig, String[] args) throws Exception {
-
-    String className = args[0];
-    Arrays.copyOfRange(args, 1, args.length);
-
-    Class<?> clazz = Class.forName(className);
-
-    // inject fluo configuration
-    Guice.createInjector(new FluoConfigModule(clazz, fluoConfig));
-
-    Method method = clazz.getMethod("main", String[].class);
-    method.invoke(null, (Object) Arrays.copyOfRange(args, 1, args.length));
+    public List<String> getColumns() {
+      if (columns == null) {
+        return Collections.emptyList();
+      }
+      return columns;
+    }
   }
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
index 47c9333..f95d0b0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.api.service.FluoWorker;
+import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.metrics.ReporterUtil;
 import org.apache.fluo.core.util.CuratorUtil;
@@ -41,10 +42,11 @@ public class FluoWorkerImpl implements FluoWorker {
   private NotificationFinder notificationFinder;
   private NodeCache appIdCache;
 
-  public FluoWorkerImpl(FluoConfiguration config) {
-    Objects.requireNonNull(config);
+  public FluoWorkerImpl(FluoConfiguration connConfig) {
+    Objects.requireNonNull(connConfig);
+    Preconditions.checkArgument(connConfig.hasRequiredConnectionProps());
+    config = FluoAdminImpl.mergeZookeeperConfig(connConfig);
     Preconditions.checkArgument(config.hasRequiredWorkerProps());
-    this.config = config;
   }
 
   @Override
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index 10b9b0b..2808edf 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -45,6 +45,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.fluo</groupId>
+      <artifactId>fluo-command</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.fluo</groupId>
       <artifactId>fluo-core</artifactId>
       <optional>true</optional>
     </dependency>
diff --git a/modules/distribution/src/main/assembly/bin.xml b/modules/distribution/src/main/assembly/bin.xml
index c745003..f4c53ff 100644
--- a/modules/distribution/src/main/assembly/bin.xml
+++ b/modules/distribution/src/main/assembly/bin.xml
@@ -49,15 +49,14 @@
       <includes>
         <include>*/**</include>
       </includes>
-      <excludes>
-        <exclude>src/main/scripts/impl/fluo-version.sh</exclude>
-      </excludes>
+      <filtered>true</filtered>
     </fileSet>
     <fileSet>
       <directory>src/main/config</directory>
-      <outputDirectory>conf/examples</outputDirectory>
+      <outputDirectory>conf</outputDirectory>
       <excludes>
-        <exclude>src/main/config/fluo.properties</exclude>
+        <exclude>src/main/config/fluo.properties.deprecated</exclude>
+        <exclude>src/main/config/fluo-app.properties</exclude>
       </excludes>
     </fileSet>
     <fileSet>
@@ -90,13 +89,13 @@
   </fileSets>
   <files>
     <file>
-      <source>src/main/config/fluo.properties</source>
-      <outputDirectory>conf/examples</outputDirectory>
+      <source>src/main/config/fluo.properties.deprecated</source>
+      <outputDirectory>conf</outputDirectory>
       <filtered>true</filtered>
     </file>
     <file>
-      <source>src/main/scripts/impl/fluo-version.sh</source>
-      <outputDirectory>bin/impl</outputDirectory>
+      <source>src/main/config/fluo-app.properties</source>
+      <outputDirectory>conf</outputDirectory>
       <filtered>true</filtered>
     </file>
   </files>
diff --git a/modules/distribution/src/main/config/fluo-app.properties b/modules/distribution/src/main/config/fluo-app.properties
new file mode 100644
index 0000000..54d41ee
--- /dev/null
+++ b/modules/distribution/src/main/config/fluo-app.properties
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+#############################
+# Fluo Application properties
+#############################
+
+# NOTE - All properties that have a default are set with it. Uncomment
+# a property if you want to use a value different than the default. 
+# Properties that have no default are uncommented and must be set by
+# the user.
+
+## Application properties
+## ----------------------
+## Specifies an observer provider.  This should be the name of a class that
+## implements org.apache.fluo.api.observer.ObserverProvider.
+#fluo.observer.provider=com.foo.AppObserverProvider
+## Observer jars in this directory are copied to the DFS (specified by fluo.dfs.root) during initialization. 
+## If this property is set, fluo.observer.jars.url should not be set.
+#fluo.observer.init.dir=/path/to/observer/jars/
+## Observer jars are retreived from this URL. If set, fluo.observer.init.dir should not be set.
+#fluo.observer.jars.url=hdfs://localhost:8020/path/to/observer/jars/
+## Properties with a prefix of fluo.app.* can easily be retrieved by a Fluo
+## application running on any node in the cluster.
+#fluo.app.key1=val1
+
+## DFS properties
+## ---------------
+## Fluo DFS root path. Should be prefixed with 'fs.defaultFS' property in Hadoop's core-site.xml
+fluo.dfs.root=hdfs://localhost:8020/fluo
+
+## Accumulo properties
+## -------------------
+## Accumulo instance to connect to
+fluo.accumulo.instance=
+## Accumulo table to initialize
+fluo.accumulo.table=\${fluo.connection.application.name}
+## Accumulo user
+fluo.accumulo.user=
+## Accumulo password
+fluo.accumulo.password=
+## Accumulo zookeepers
+#fluo.accumulo.zookeepers=localhost
+## Optional - List of jars to provide to Accumulo. If not set, Fluo will find jars on classpath.
+#fluo.accumulo.jars=/path/to/a.jar,/path/to/b.jar
+
+# Transaction properties
+# ----------------------
+# Amount of time (in milliseconds) clients wait before rolling back transaction
+#fluo.tx.rollback.time=300000
+
+## Worker properties
+## -----------------
+# Number of threads in each worker instance
+#fluo.worker.num.threads=10
+
+## Loader properties
+## -----------------
+## Number of threads each loader runs.  Can set to zero for no threads, thread
+## adding Loader will execute.  Must also set fluo.loader.queue.size to zero
+## when setting this to zero.
+#fluo.loader.num.threads=10
+## Queue size of loader
+#fluo.loader.queue.size=10
+
+## Metrics
+## -------
+## Configure reporters for metrics. The frequency for each type of reporter is in seconds.
+
+#fluo.metrics.reporter.console.enable=false
+#fluo.metrics.reporter.console.target=stdout
+#fluo.metrics.reporter.console.rateUnit=seconds
+#fluo.metrics.reporter.console.durationUnit=milliseconds
+#fluo.metrics.reporter.console.frequency=60
+
+#fluo.metrics.reporter.csv.enable=false
+#fluo.metrics.reporter.csv.dir=/tmp/
+#fluo.metrics.reporter.csv.rateUnit=seconds
+#fluo.metrics.reporter.csv.durationUnit=milliseconds
+#fluo.metrics.reporter.csv.frequency=60
+
+#fluo.metrics.reporter.graphite.enable=false
+#fluo.metrics.reporter.graphite.host=carbon.server.com
+#fluo.metrics.reporter.graphite.port=8080
+#fluo.metrics.reporter.graphite.rateUnit=seconds
+#fluo.metrics.reporter.graphite.durationUnit=milliseconds
+#fluo.metrics.reporter.graphite.frequency=60
+#fluo.metrics.reporter.graphite.prefix=
+
+#fluo.metrics.reporter.jmx.enable=false
+#fluo.metrics.reporter.jmx.rateUnit=seconds
+#fluo.metrics.reporter.jmx.durationUnit=milliseconds
+
+#fluo.metrics.reporter.slf4j.enable=false
+#fluo.metrics.reporter.slf4j.logger=metrics
+#fluo.metrics.reporter.slf4j.rateUnit=seconds
+#fluo.metrics.reporter.slf4j.durationUnit=milliseconds
diff --git a/modules/distribution/src/main/config/fluo-conn.properties b/modules/distribution/src/main/config/fluo-conn.properties
new file mode 100644
index 0000000..52db6b2
--- /dev/null
+++ b/modules/distribution/src/main/config/fluo-conn.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+############################
+# Fluo Connection properties
+############################
+
+# NOTE - All properties that have a default are set with it.  Uncomment
+# a property if you want to use a value different than the default. 
+# Properties that have no default are uncommented and must be set by
+# the user.
+
+## Zookeeper connection string specifying host and chroot where Fluo stores data.
+## A chroot directory suffix must be specified but doesn't need to be named
+## '/fluo'.  If not specified, a Fluo application cannot be initialized.
+#fluo.connection.zookeepers=localhost/fluo
+## Zookeeper timeout
+#fluo.connection.zookeeper.timeout=30000
+## Connection retry timeout (in milliseconds). Set to -1 to retry forever.
+#fluo.connection.retry.timeout.ms=-1
diff --git a/modules/distribution/src/main/config/fluo-env.sh b/modules/distribution/src/main/config/fluo-env.sh
index 118b85c..eac5ed8 100755
--- a/modules/distribution/src/main/config/fluo-env.sh
+++ b/modules/distribution/src/main/config/fluo-env.sh
@@ -11,11 +11,43 @@
 # or implied. See the License for the specific language governing permissions and limitations under
 # the License.
 
-# Sets HADOOP_PREFIX if it is not already set.  Please modify the
-# export statement to use the correct directory.  Remove the test
-# statement to override any previously set environment.
+## Before fluo-env.sh is loaded, these environment variables are set and can be used in this file:
 
-test -z "$HADOOP_PREFIX" && export HADOOP_PREFIX=/path/to/hadoop
+# cmd - Command that is being called such as oracle, worker, etc.
+# app - Fluo application name 
+# basedir - Root of Fluo installation
+# conf - Directory containing Fluo configuration
+# lib - Directory containing Fluo libraries
+
+####################################
+# General variables that must be set
+####################################
+
+## Hadoop installation
+export HADOOP_PREFIX="${HADOOP_PREFIX:-/path/to/hadoop}"
+## Fluo connection properties
+export FLUO_CONN_PROPS="${FLUO_CONN_PROPS:-${conf}/fluo-conn.properties}"
+
+###########################################################
+# Variables for running Fluo services (i.e oracle, worker).
+# Defaults below work but can be edited.
+###########################################################
+
+## Fluo logs directory. Referenced by logger config.
+export FLUO_LOG_DIR="${FLUO_LOG_DIR:-${basedir}/logs}"
+## Fluo log4j configuration
+export FLUO_LOG4J_CONFIG="${FLUO_LOG4J_CONFIG:-${conf}/log4j.properties}"
+## Fluo log identifier
+export FLUO_LOG_ID="${cmd}_$(hostname)_$(date +%s)"
+## Java options for Fluo services
+SERVICE_OPTS=("-Dlog4j.configuration=file:${FLUO_LOG4J_CONFIG}"
+           "-Dfluo.log.dir=${FLUO_LOG_DIR}/${app}"
+           "-Dfluo.log.id=${FLUO_LOG_ID}")
+export SERVICE_OPTS
+
+##########################
+# Build CLASSPATH variable
+##########################
 
 # The classpath for Fluo must be defined.  The Fluo tarball does not include
 # jars for Accumulo, Zookeeper, or Hadoop.  This example env file offers two
@@ -47,7 +79,13 @@ setupClasspathFromSystem()
   test -z "$ACCUMULO_HOME" && ACCUMULO_HOME=/path/to/accumulo
   test -z "$ZOOKEEPER_HOME" && ZOOKEEPER_HOME=/path/to/zookeeper
 
-  CLASSPATH="$FLUO_HOME/lib/*:$FLUO_HOME/lib/logback/*"
+  CLASSPATH="$lib/*"
+  # If fluo-conn.properties exists, then classpath does not need to include twill or logback
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    CLASSPATH="$CLASSPATH:$lib/log4j/*"
+  else
+    CLASSPATH="$CLASSPATH:$lib/twill/*:$lib/logback/*"
+  fi
 
   #any jars matching this pattern is excluded from classpath
   EXCLUDE_RE="(.*log4j.*)|(.*asm.*)|(.*guava.*)|(.*gson.*)"
@@ -61,14 +99,22 @@ setupClasspathFromSystem()
   addToClasspath "$HADOOP_PREFIX/share/hadoop/hdfs/lib" $EXCLUDE_RE;
   addToClasspath "$HADOOP_PREFIX/share/hadoop/yarn" $EXCLUDE_RE;
   addToClasspath "$HADOOP_PREFIX/share/hadoop/yarn/lib" $EXCLUDE_RE;
+  export CLASSPATH
 }
 
 
 # This function obtains Accumulo, Hadoop, and Zookeeper jars from
-# $FLUO_HOME/lib/ahz/. Before using this function, make sure you run
+# $lib/ahz/. Before using this function, make sure you run
 # `./lib/fetch.sh ahz` to download dependencies to this directory.
 setupClasspathFromLib(){
-  CLASSPATH="$FLUO_HOME/lib/*:$FLUO_HOME/lib/logback/*:$FLUO_HOME/lib/ahz/*"
+  CLASSPATH="$lib/*"
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    CLASSPATH="$CLASSPATH:$lib/log4j/*"
+  else
+    CLASSPATH="$CLASSPATH:$lib/twill/*:$lib/logback/*"
+  fi
+  CLASSPATH="$CLASSPATH:$lib/ahz/*"
+  export CLASSPATH
 }
 
 # Call one of the following functions to setup the classpath or write your own
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties.deprecated
similarity index 95%
rename from modules/distribution/src/main/config/fluo.properties
rename to modules/distribution/src/main/config/fluo.properties.deprecated
index f9f603d..4bec6aa 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties.deprecated
@@ -15,7 +15,10 @@
 # Fluo properties
 #################
 
-# NOTE - All properties that have a default are set with it.  Uncomment
+# NOTE - This file has been deprecated and replaced by fluo-conn.properties & fluo-app.properties! 
+# If you would like to use this file instead, rename it to fluo.properties and remove fluo-conn.properties.
+#
+# All properties that have a default are set with it.  Uncomment
 # a property if you want to use a value different than the default. 
 # Properties that have no default are uncommented and must be set by
 # the user.  Most are unset except for fluo.accumulo.classpath which
diff --git a/modules/distribution/src/main/config/log4j.properties b/modules/distribution/src/main/config/log4j.properties
new file mode 100644
index 0000000..7b2feb7
--- /dev/null
+++ b/modules/distribution/src/main/config/log4j.properties
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## Log4j 1.2 file that configures logging for Fluo processes
+## The system properties referenced below are configured by fluo-env.sh
+
+## Define a log file appender
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=${fluo.log.dir}/${fluo.log.id}.log
+log4j.appender.file.MaxFileSize=100MB
+log4j.appender.file.MaxBackupIndex=10
+log4j.appender.file.Threshold=INFO
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ISO8601} [%-8c{2}] %-5p: %m%n
+
+## Constrain some particularly spammy loggers
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.curator=ERROR
+
+## Append most logs to file
+log4j.rootLogger=DEBUG, file
diff --git a/modules/distribution/src/main/config/log4j.xml b/modules/distribution/src/main/config/log4j.xml
deleted file mode 100644
index 65b2355..0000000
--- a/modules/distribution/src/main/config/log4j.xml
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more contributor license
-  agreements. See the NOTICE file distributed with this work for additional information regarding
-  copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance with the License. You may obtain a
-  copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software distributed under the License
-  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-  or implied. See the License for the specific language governing permissions and limitations under
-  the License.
--->
-
-<!--
-This file configures logging for MiniFluo (when using 'mini-fluo' command)
--->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="A2" class="org.apache.log4j.RollingFileAppender">
-    <param name="File" value="${fluo.log.dir}/${fluo.log.app}_${fluo.log.host}.log" />
-    <param name="MaxFileSize" value="1000MB" />
-    <param name="MaxBackupIndex" value="10" />
-    <param name="Threshold" value="DEBUG" />
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n" />
-    </layout>
-  </appender>
-
-  <logger name="org.apache.zookeeper">
-    <level value="ERROR" />
-  </logger>
-
-  <logger name="org.apache.curator">
-    <level value="ERROR" />
-  </logger>
-
-  <root>
-    <level value="DEBUG" />
-    <appender-ref ref="A2" />
-  </root>
-</log4j:configuration>
diff --git a/modules/distribution/src/main/lib/fetch.sh b/modules/distribution/src/main/lib/fetch.sh
index 4f1e4b3..fcffbcb 100755
--- a/modules/distribution/src/main/lib/fetch.sh
+++ b/modules/distribution/src/main/lib/fetch.sh
@@ -57,14 +57,10 @@ extra)
   download aopalliance:aopalliance:jar:1.0
   download ch.qos.logback:logback-classic:jar:1.1.3 ./logback
   download ch.qos.logback:logback-core:jar:1.1.3 ./logback
-  download com.101tec:zkclient:jar:0.3
   download com.beust:jcommander:jar:1.32
-  download com.google.code.findbugs:jsr305:jar:2.0.1
   download com.google.code.gson:gson:jar:2.2.4
   download com.google.guava:guava:jar:13.0.1
   download com.google.inject:guice:jar:4.0
-  download com.yammer.metrics:metrics-annotation:jar:2.2.0
-  download com.yammer.metrics:metrics-core:jar:2.2.0
   download commons-collections:commons-collections:jar:3.2.1
   download commons-configuration:commons-configuration:jar:1.10
   download commons-io:commons-io:jar:2.4
@@ -72,32 +68,38 @@ extra)
   download io.dropwizard.metrics:metrics-graphite:jar:3.1.1
   download javax.inject:javax.inject:jar:1
   download log4j:log4j:jar:1.2.17 ./log4j
-  download net.sf.jopt-simple:jopt-simple:jar:3.2
   download org.apache.curator:curator-client:jar:2.7.1
   download org.apache.curator:curator-framework:jar:2.7.1
   download org.apache.curator:curator-recipes:jar:2.7.1
-  download org.apache.kafka:kafka_2.10:jar:0.8.0
-  download org.apache.twill:twill-api:jar:0.6.0-incubating
-  download org.apache.twill:twill-common:jar:0.6.0-incubating
-  download org.apache.twill:twill-core:jar:0.6.0-incubating
-  download org.apache.twill:twill-discovery-api:jar:0.6.0-incubating
-  download org.apache.twill:twill-discovery-core:jar:0.6.0-incubating
-  download org.apache.twill:twill-yarn:jar:0.6.0-incubating
-  download org.apache.twill:twill-zookeeper:jar:0.6.0-incubating
   download org.hdrhistogram:HdrHistogram:jar:2.1.8
   download org.mpierce.metrics.reservoir:hdrhistogram-metrics-reservoir:jar:1.1.0
-  download org.ow2.asm:asm-all:jar:5.0.2
-  download org.scala-lang:scala-compiler:jar:2.10.1
-  download org.scala-lang:scala-library:jar:2.10.1
-  download org.scala-lang:scala-reflect:jar:2.10.1
   download org.slf4j:jcl-over-slf4j:jar:1.7.2
   download org.slf4j:log4j-over-slf4j:jar:1.7.12 ./logback
   download org.slf4j:slf4j-api:jar:1.7.12
   download org.slf4j:slf4j-log4j12:jar:1.7.12 ./log4j
-  download org.xerial.snappy:snappy-java:jar:1.0.5
-  # See https://github.com/apache/fluo/issues/820
+  # See https://github.com/apache/incubator/issues/820
   download io.netty:netty:jar:3.9.9.Final
 
+  # Jars for deprecated launching in YARN (in Twill)
+  download com.101tec:zkclient:jar:0.3 ./twill
+  download com.google.code.findbugs:jsr305:jar:2.0.1 ./twill
+  download com.yammer.metrics:metrics-annotation:jar:2.2.0 ./twill
+  download com.yammer.metrics:metrics-core:jar:2.2.0 ./twill
+  download net.sf.jopt-simple:jopt-simple:jar:3.2 ./twill
+  download org.apache.kafka:kafka_2.10:jar:0.8.0 ./twill
+  download org.apache.twill:twill-api:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-common:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-core:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-discovery-api:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-discovery-core:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-yarn:jar:0.6.0-incubating ./twill
+  download org.apache.twill:twill-zookeeper:jar:0.6.0-incubating ./twill
+  download org.ow2.asm:asm-all:jar:5.0.2 ./twill
+  download org.scala-lang:scala-compiler:jar:2.10.1 ./twill
+  download org.scala-lang:scala-library:jar:2.10.1 ./twill
+  download org.scala-lang:scala-reflect:jar:2.10.1 ./twill
+  download org.xerial.snappy:snappy-java:jar:1.0.5 ./twill
+
   echo -e "Done!\n"
   echo "NOTE - The dependencies downloaded have been tested with some versions of Hadoop, Zookeeper, and Accumulo."
   echo "There is no guarantee they will work with all versions. Fluo chose to defer dependency resolution to as"
diff --git a/modules/distribution/src/main/scripts/fluo b/modules/distribution/src/main/scripts/fluo
index 8b08369..05cdd4b 100755
--- a/modules/distribution/src/main/scripts/fluo
+++ b/modules/distribution/src/main/scripts/fluo
@@ -13,36 +13,48 @@
 # or implied. See the License for the specific language governing permissions and limitations under
 # the License.
 
-# Start: Resolve Script Directory
 SOURCE="${BASH_SOURCE[0]}"
-while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
+while [ -h "$SOURCE" ]; do
    bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
    SOURCE="$(readlink "$SOURCE")"
-   [[ $SOURCE != /* ]] && SOURCE="$bin/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+   [[ $SOURCE != /* ]] && SOURCE="$bin/$SOURCE"
 done
-bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
-script=$( basename "$SOURCE" )
-# Stop: Resolve Script Directory
-
-. "$bin"/impl/config.sh
+# Set up variables needed by fluo-env.sh
+export bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+export basedir="$( cd -P ${bin}/.. && pwd )"
+export conf="$basedir/conf"
+export lib="$basedir/lib"
+export cmd="$1"
+case "$cmd" in
+  oracle|worker|init) export app="$2" ;;
+esac
+export FLUO_VERSION=${project.version}
 
+if [ ! -f "$conf/fluo-env.sh" ]; then
+  echo "fluo-env.sh must exist in $conf"
+  exit 1
+fi
+source "$conf/fluo-env.sh"
 export CLASSPATH
 
+
+deprecated_fluo_props=$conf/fluo.properties
+
+if [[ -f "$FLUO_CONN_PROPS" && -f "$deprecated_fluo_props" ]]; then
+  echo "Fluo is being configured by $FLUO_CONN_PROPS and $deprecated_fluo_props. Remove one of these files."
+  exit 1
+fi
+
 # stop if any command fails
 set -e
 
-APP=$2
-APP_DIR=$FLUO_HOME/apps/$APP
-APP_CONF_DIR=$APP_DIR/conf
-APP_LIB_DIR=$APP_DIR/lib
-
 function copy_config {
-  if [ -f $FLUO_CONF_DIR/$1 ]; then
-    cp $FLUO_CONF_DIR/$1 $APP_CONF_DIR/
-  elif [ -f $FLUO_CONF_DIR/examples/$1 ]; then
-    cp FLUO_CONF_DIR/examples/$1 $APP_CONF_DIR
+  if [ -f "$conf/$1" ]; then
+    cp "$conf/$1" "$APP_CONF_DIR/"
+  elif [ -f "$conf/examples/$1" ]; then
+    cp "$conf/examples/$1" "$APP_CONF_DIR"
   else
-    echo "Config file $1 not found in $FLUO_CONF_DIR or $FLUO_CONF_DIR/examples"
+    echo "Config file $1 not found in $conf or $conf/examples"
     exit 1
   fi
 }
@@ -50,36 +62,61 @@ function copy_config {
 function print_usage {
   echo -e "Usage: fluo <command> (<argument> ...)\n"
   echo -e "Possible commands:\n"
-  echo "  list          Lists all Fluo applications in Fluo instance"
-  echo "  new <app>     Creates configuration for new application in apps/"
-  echo "  init <app>    Initializes Fluo application using configuration in apps/<app>/conf/fluo.properties"
-  echo "  start <app>   Starts Fluo application on cluster"
-  echo "  stop <app>    Stops Fluo application on cluster"
-  echo "  kill <app>    Kills Fluo application on cluster"
-  echo "  status <app>  Prints status of Fluo application"
-  echo "  info <app>    Prints information about containers of Fluo application"
-  echo "  scan <app>    Prints snapshot of data in Fluo application"
-  echo "  classpath     Prints the classpath setup in fluo-env.sh"
-  echo "  wait <app>    Waits until all notifications are processed"
-  echo "  version       Prints the version of Fluo"
-  echo "  exec <app> <class>  {<argument>}";
+  echo "  init <app> <appProps> {<arg>} Initializes Fluo application for <app> using <appProps>. Run with -h to see additional args."
+  echo "  classpath                     Prints the classpath setup in fluo-env.sh"
+  echo "  list                          Lists all Fluo applications in Fluo instance"
+  echo "  config <app>                  Prints application configuration stored in Zookeeper for <app>"
+  echo "  scan <app>                    Prints snapshot of data in Fluo <app>"
+  echo "  stop <app>                    Stops Fluo application processes on this machine for <app>"
+  echo "  oracle <app>                  Starts Fluo Oracle process for <app>"
+  echo "  worker <app>                  Starts Fluo Worker process for <app>"
+  echo "  version                       Prints the version of Fluo"
+  echo "  wait <app>                    Waits until all notifications are processed for <app>"
+  echo "  exec <app> <class> {<arg>}    Executes <class> with <args> using classpath for <app>";
+
+  echo -e "\nDeprecated commands (available if fluo.properties exists):\n"
+  echo "  new <app>     (Deprecated) Creates configuration for new application in apps/"
+  echo "  start <app>   (Deprecated) Starts Fluo application on cluster"
+  echo "  init <app>    (Deprecated) Initializes Fluo application using configuration in apps/<app>/conf/fluo.properties"
+  echo "  kill <app>    (Deprecated) Kills Fluo application on cluster"
+  echo "  status <app>  (Deprecated) Prints status of Fluo application"
+  echo "  info <app>    (Deprecated) Prints information about containers of Fluo application"
   echo " "
   exit 1
 }
 
-function validate_app {
-  if [ -z "$APP" ]; then
+function check_conn_props {
+  if [ ! -f "$FLUO_CONN_PROPS" ]; then
+    echo "$FLUO_CONN_PROPS must exist!"
+    exit 1
+  fi
+}
+
+function verify_app {
+  if [ -z "$1" ]; then
     echo -e "The application name (set by <app>) cannot be an empty string!\n"
     print_usage
   fi
-  if [[ $APP = *"-h"* ]]; then
+  if [[ $1 = *"-h"* ]]; then
     print_usage
   fi
 }
 
-function validate_app_full {
-  validate_app
-  java org.apache.fluo.cluster.util.ValidateAppName $APP
+function deprecated_verify {
+  verify_app "$1"
+  APP=$1
+  APP_DIR=$basedir/apps/$APP
+  APP_CONF_DIR=$APP_DIR/conf
+  APP_LIB_DIR=$APP_DIR/lib
+  if [ ! -f "$deprecated_fluo_props" ]; then
+    echo "ERROR - This command is deprecated can only be used if fluo.properties exists in $conf"
+    exit 1
+  fi
+}
+
+function deprecated_verify_full {
+  deprecated_verify "$1"
+  java org.apache.fluo.cluster.util.ValidateAppName "$APP"
   if [[ ! -d $APP_DIR || ! -d $APP_CONF_DIR || ! -d $APP_LIB_DIR  ]]; then
     echo "ERROR - The Fluo '$APP' application needs to be configured in apps/ with a conf/ and lib/ directory.  Use 'fluo new $APP' to create this configuration"
     exit 1
@@ -97,60 +134,159 @@ function check_hadoop {
   fi
 }
 
+function setup_service {
+  verify_app "$1"
+  check_conn_props
+  app_run_dir=$basedir/run/$app
+  mkdir -p "$app_run_dir" 2>/dev/null
+  app_log_dir=$FLUO_LOG_DIR/$app
+  mkdir -p "$app_log_dir" 2>/dev/null
+  app_lib=$lib/apps/$1
+  mkdir -p "$app_lib"
+  java org.apache.fluo.command.FluoGetJars "$FLUO_CONN_PROPS" "$1" "$app_lib"
+  export CLASSPATH="$conf:$app_lib/*:$CLASSPATH"
+}
+
 case "$1" in
-new)
-  validate_app
-  java org.apache.fluo.cluster.util.ValidateAppName $APP
-  if [ -d $APP_DIR ]; then
-    echo "The Fluo '$APP' application already has a directory in apps/"
-    exit 1
-  fi
-  mkdir -p $APP_DIR
-  mkdir -p $APP_CONF_DIR
-  mkdir -p $APP_LIB_DIR
-  copy_config fluo.properties
-  $SED "s/fluo.client.application.name=/fluo.client.application.name=$APP/g" $APP_CONF_DIR/fluo.properties
-  copy_config logback.xml
+config)
+  verify_app "$2"
+  check_conn_props
+  java org.apache.fluo.command.FluoConfig "$FLUO_CONN_PROPS" "$2"
   ;;
 init)
-  validate_app_full
-  check_hadoop
-  if [[ $@ != *"-h"* && $@ != *"-u"* ]]; then
-    echo "Copying Fluo jars to HDFS at /fluo/lib to be accessible by Accumulo for iterators"
-    $HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /fluo/lib
-    echo "Copying `ls $FLUO_HOME/lib/fluo-api-*.jar` to HDFS"
-    $HADOOP_PREFIX/bin/hdfs dfs -copyFromLocal -f $FLUO_HOME/lib/fluo-api-*.jar /fluo/lib/
-    echo "Copying `ls $FLUO_HOME/lib/fluo-accumulo-*.jar` to HDFS"
-    $HADOOP_PREFIX/bin/hdfs dfs -copyFromLocal -f $FLUO_HOME/lib/fluo-accumulo-*.jar /fluo/lib/
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    check_conn_props
+    if [ -z "$app" ]; then
+      echo -e "The application name (set by <app>) cannot be an empty string!\n"
+      print_usage
+    fi
+    if [[ "$app" = *"-h"* ]]; then
+      java org.apache.fluo.command.FluoInit "$FLUO_CONN_PROPS" none /dev/null "$2"
+      exit 1
+    fi
+    app_props=$3
+    if [ ! -f "$app_props" ]; then
+      echo "<appProps> is set to path '$app_props' that does not exist"
+      print_usage
+      exit 1
+    fi
+    init_dir=$(java org.apache.fluo.command.FluoGetProp "$FLUO_CONN_PROPS" "$app_props" fluo.observer.init.dir)
+    if [ -d "$init_dir" ]; then
+      echo "Adding $init_dir/* to CLASSPATH"
+      export CLASSPATH="$init_dir/*:$CLASSPATH"
+    fi
+    java org.apache.fluo.command.FluoInit "$FLUO_CONN_PROPS" "$app" "$app_props" ${*:4}
+  else
+    deprecated_verify_full "$2"
+    check_hadoop
+    export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
   fi
-  export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
-  java org.apache.fluo.cluster.command.FluoCommand $FLUO_HOME $HADOOP_PREFIX "$@"
   ;;
-start)
-  validate_app_full
-  check_hadoop
-  export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
-  java org.apache.fluo.cluster.command.FluoCommand $FLUO_HOME $HADOOP_PREFIX "$@"
+oracle)
+  setup_service "$2"
+  nohup java "${SERVICE_OPTS[@]}" org.apache.fluo.command.FluoOracle "$FLUO_CONN_PROPS" "$2" > "${app_log_dir}/${FLUO_LOG_ID}.out" 2> "${app_log_dir}/${FLUO_LOG_ID}.err" &
+  echo "$!" > "${app_run_dir}/${FLUO_LOG_ID}.pid"
+  echo "Started Oracle for '$2' Fluo application. Logs can be found in $app_log_dir"
   ;;
-stop|kill|status|info|scan|wait)
-  validate_app
-  check_hadoop
-  java org.apache.fluo.cluster.command.FluoCommand $FLUO_HOME $HADOOP_PREFIX "$@"
+worker)
+  setup_service "$2"
+  nohup java "${SERVICE_OPTS[@]}" org.apache.fluo.command.FluoWorker "$FLUO_CONN_PROPS" "$2" > "${app_log_dir}/${FLUO_LOG_ID}.out" 2> "${app_log_dir}/${FLUO_LOG_ID}.err" &
+  echo "$!" > "${app_run_dir}/${FLUO_LOG_ID}.pid"
+  echo "Started Worker for '$2' Fluo application. Logs can be found in $app_log_dir"
+  ;;
+scan)
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    verify_app "$2"
+    java org.apache.fluo.command.FluoScan "$FLUO_CONN_PROPS" ${*:2}
+  else
+    check_hadoop
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  fi
+  ;;
+stop)
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    verify_app "$2"
+    echo "Stopping all processes for ${2} application"
+    for pid_file in $basedir/run/$2/*.pid; do
+      if [ -f "$pid_file" ]; then
+        kill -s KILL "$(cat "$pid_file")" 2>/dev/null
+        rm -f "${pid_file}" 2>/dev/null
+      fi 
+    done
+  else 
+    check_hadoop
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  fi
+  ;;
+ps) 
+  jps -m | grep Fluo
   ;;
 list)
-  check_hadoop
-  java org.apache.fluo.cluster.command.FluoCommand $FLUO_HOME $HADOOP_PREFIX $1 app ${*:2}
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    java org.apache.fluo.command.FluoList "$FLUO_CONN_PROPS"
+  else
+    check_hadoop
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" list app ${*:2}
+  fi
   ;;
 classpath)
   echo "$CLASSPATH"
   ;;
 exec)
-  export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
-  java org.apache.fluo.cluster.command.FluoCommand $FLUO_HOME $HADOOP_PREFIX "$@"
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    setup_service $2
+    java org.apache.fluo.command.FluoExec "$FLUO_CONN_PROPS" ${*:2}
+  else
+    deprecated_verify "$2"
+    export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  fi
   ;;
 version)
   echo "$FLUO_VERSION"
   ;;
+wait)
+  if [ -f "$FLUO_CONN_PROPS" ]; then
+    verify_app "$2"
+    java org.apache.fluo.command.FluoWait "$FLUO_CONN_PROPS" $2
+  else
+    deprecated_verify "$2"
+    export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
+    java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  fi
+  ;;
+# Commands below this comment are deprecated
+new)
+  deprecated_verify "$2"
+  java org.apache.fluo.cluster.util.ValidateAppName "$APP"
+  if [ -d "$APP_DIR" ]; then
+    echo "The Fluo '$APP' application already has a directory in apps/"
+    exit 1
+  fi
+  mkdir -p "$APP_DIR"
+  mkdir -p "$APP_CONF_DIR"
+  mkdir -p "$APP_LIB_DIR"
+  copy_config fluo.properties
+  if [[ "$OSTYPE" == "darwin"* ]]; then
+    sed_cmd="sed -i .bak"
+  else
+    sed_cmd="sed -i"
+  fi
+  $sed_cmd "s/fluo.client.application.name=/fluo.client.application.name=$APP/g" "$APP_CONF_DIR/fluo.properties"
+  copy_config logback.xml
+  ;;
+start)
+  deprecated_verify_full "$2"
+  check_hadoop
+  export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
+  java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  ;;
+kill|status|info)
+  deprecated_verify "$2"
+  check_hadoop
+  java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
+  ;;
 *)
   print_usage
 esac
diff --git a/modules/distribution/src/main/scripts/impl/config.sh b/modules/distribution/src/main/scripts/impl/config.sh
deleted file mode 100755
index 4b46085..0000000
--- a/modules/distribution/src/main/scripts/impl/config.sh
+++ /dev/null
@@ -1,67 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
-# agreements. See the NOTICE file distributed with this work for additional information regarding
-# copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance with the License. You may obtain a
-# copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software distributed under the License
-# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-# or implied. See the License for the specific language governing permissions and limitations under
-# the License.
-
-# Start: Resolve Script Directory
-SOURCE="${BASH_SOURCE[0]}"
-while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
-   impl="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
-   SOURCE="$(readlink "$SOURCE")"
-   [[ $SOURCE != /* ]] && SOURCE="$impl/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
-done
-impl="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
-bin="$( cd -P "$( dirname "$impl" )" && pwd )"
-script=$( basename "$SOURCE" )
-# Stop: Resolve Script Directory
-
-# Determine FLUO_HOME - Use env variable set by user.  If none set, calculate using bin dir
-FLUO_HOME="${FLUO_HOME:-$( cd -P ${bin}/.. && pwd )}"
-export FLUO_HOME
-if [ -z "$FLUO_HOME" -o ! -d "$FLUO_HOME" ]
-then
-  echo "FLUO_HOME=$FLUO_HOME is not a valid directory.  Please make sure it exists"
-  exit 1
-fi
-
-# Determine FLUO_CONF_DIR - Use env variable set by user.  If none set, calculate using FLUO_HOME
-FLUO_CONF_DIR="${FLUO_CONF_DIR:-$FLUO_HOME/conf}"
-export FLUO_CONF_DIR
-if [ -z "$FLUO_CONF_DIR" -o ! -d "$FLUO_CONF_DIR" ]
-then
-  echo "FLUO_CONF_DIR=$FLUO_CONF_DIR is not a valid directory.  Please make sure it exists"
-  exit 1
-fi
-
-# Determine FLUO_LIB_DIR - Use env variable set by user.  If none set, calculate using FLUO_HOME
-FLUO_LIB_DIR="${FLUO_LIB_DIR:-$FLUO_HOME/lib}"
-export FLUO_LIB_DIR
-if [ -z "$FLUO_CONF_DIR" -o ! -d "$FLUO_LIB_DIR" ]
-then
-  echo "FLUO_LIB_DIR=$FLUO_LIB_DIR is not a valid directory.  Please make sure it exists"
-  exit 1
-fi
-
-if [ -f "$bin"/../conf/fluo-env.sh ]; then
-    . "$bin"/../conf/fluo-env.sh
-fi
-
-if [[ "$OSTYPE" == "darwin"* ]]; then
-  export MD5=md5
-  export SED="sed -i .bak"
-else
-  export MD5=md5sum
-  export SED="sed -i"
-fi
-
-. $impl/fluo-version.sh
diff --git a/modules/distribution/src/main/scripts/impl/fluo-version.sh b/modules/distribution/src/main/scripts/impl/fluo-version.sh
deleted file mode 100644
index 30dd115..0000000
--- a/modules/distribution/src/main/scripts/impl/fluo-version.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#! /usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
-# agreements. See the NOTICE file distributed with this work for additional information regarding
-# copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance with the License. You may obtain a
-# copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software distributed under the License
-# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-# or implied. See the License for the specific language governing permissions and limitations under
-# the License.
-
-# Maven should set correct Fluo version below at build time
-FLUO_VERSION=${project.version}
diff --git a/modules/distribution/src/main/scripts/local-fluo b/modules/distribution/src/main/scripts/local-fluo
index 16ce77e..5fad526 100755
--- a/modules/distribution/src/main/scripts/local-fluo
+++ b/modules/distribution/src/main/scripts/local-fluo
@@ -13,18 +13,24 @@
 # or implied. See the License for the specific language governing permissions and limitations under
 # the License.
 
-# Start: Resolve Script Directory
 SOURCE="${BASH_SOURCE[0]}"
-while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
+while [ -h "$SOURCE" ]; do
    bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
    SOURCE="$(readlink "$SOURCE")"
-   [[ $SOURCE != /* ]] && SOURCE="$bin/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+   [[ $SOURCE != /* ]] && SOURCE="$bin/$SOURCE"
 done
-bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
-script=$( basename "$SOURCE" )
-# Stop: Resolve Script Directory
+# Set up variables needed by fluo-env.sh
+export bin="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+export basedir="$( cd -P ${bin}/.. && pwd )"
+export conf="$basedir/conf"
+export lib="$basedir/lib"
+export cmd="$1"
+export FLUO_HOME="$basedir"
+FLUO_VERSION=${project.version}
 
-. "$bin"/impl/config.sh
+if [ -f "$conf/fluo-env.sh" ]; then
+  . "$conf"/fluo-env.sh
+fi
 
 export CLASSPATH
 
@@ -39,6 +45,7 @@ APP_LOG_DIR=$APP_DIR/logs
 
 function print_usage {
   echo -e "Usage: local-fluo <command> (<argument> ...)\n"
+  echo -e "This script is now deprecated.  Use 'fluo' script to start local processes\n"
   echo -e "Possible commands:\n"
   echo "  start-oracle <app>    Starts local Fluo oracle for application"
   echo "  start-worker <app>    Starts local Fluo worker for application"
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 4a28236..58eface 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -21,7 +21,9 @@ import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
 import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
 import org.apache.fluo.api.client.FluoAdmin.TableExistsException;
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.client.FluoAdminImpl;
+import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.core.util.CuratorUtil;
 import org.apache.fluo.integration.ITBaseImpl;
 import org.junit.Assert;
@@ -72,6 +74,30 @@ public class FluoAdminImplIT extends ITBaseImpl {
   }
 
   @Test
+  public void testInitializeConfig() throws Exception {
+
+    // stop oracle to avoid spurious exceptions when initializing
+    oserver.stop();
+
+    FluoConfiguration localConfig = new FluoConfiguration(config);
+    localConfig.setAccumuloClasspath("${fluo.connection.application.name}");
+    Assert.assertEquals(localConfig.getApplicationName(), localConfig.getAccumuloClasspath());
+
+    try (FluoAdmin admin = new FluoAdminImpl(localConfig)) {
+
+      InitializationOptions opts =
+          new InitializationOptions().setClearZookeeper(true).setClearTable(true);
+      admin.initialize(opts);
+    }
+
+    try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
+      FluoConfiguration sharedConfig = client.getSharedConfiguration();
+      Assert.assertEquals(localConfig.getAccumuloClasspath(), sharedConfig.getAccumuloClasspath());
+      Assert.assertEquals(sharedConfig.getApplicationName(), sharedConfig.getAccumuloClasspath());
+    }
+  }
+
+  @Test
   public void testInitializeWithNoChroot() throws Exception {
 
     // stop oracle to avoid spurious exceptions when initializing
diff --git a/modules/core/src/test/java/org/apache/fluo/core/client/FluoClientTest.java b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoClientIT.java
similarity index 54%
rename from modules/core/src/test/java/org/apache/fluo/core/client/FluoClientTest.java
rename to modules/integration/src/test/java/org/apache/fluo/integration/client/FluoClientIT.java
index 33d1291..3dfd35f 100644
--- a/modules/core/src/test/java/org/apache/fluo/core/client/FluoClientTest.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/client/FluoClientIT.java
@@ -13,50 +13,67 @@
  * the License.
  */
 
-package org.apache.fluo.core.client;
+package org.apache.fluo.integration.client;
 
+import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.integration.ITBaseImpl;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class FluoClientTest {
+public class FluoClientIT extends ITBaseImpl {
 
   @Test
-  public void testFailures() {
+  public void testBasic() {
+    try (FluoClient client = FluoFactory.newClient(config)) {
+      client.newSnapshot();
+    }
+
+    FluoConfiguration fluoConfig = new FluoConfiguration();
+    fluoConfig.setApplicationName(config.getApplicationName());
+    fluoConfig.setInstanceZookeepers(config.getInstanceZookeepers());
+
+    try (FluoClient client = FluoFactory.newClient(fluoConfig)) {
+      client.newSnapshot();
+    }
+
+    try (FluoClientImpl client = new FluoClientImpl(fluoConfig)) {
+      client.newSnapshot();
+      FluoConfiguration sharedConfig = client.getSharedConfiguration();
+      Assert.assertEquals(config.getAccumuloTable(), sharedConfig.getAccumuloTable());
+      Assert.assertEquals(config.getAccumuloInstance(), sharedConfig.getAccumuloInstance());
+      Assert.assertEquals(config.getAccumuloUser(), sharedConfig.getAccumuloUser());
+      Assert.assertEquals(config.getZookeeperTimeout(), sharedConfig.getZookeeperTimeout());
+      Assert.assertEquals(config.getTransactionRollbackTime(),
+          sharedConfig.getTransactionRollbackTime());
+    }
+  }
 
+  @Test
+  public void testFailures() {
     // we are expecting errors in this test
     Level clientLevel = Logger.getLogger(FluoClientImpl.class).getLevel();
     Logger.getLogger(FluoClientImpl.class).setLevel(Level.FATAL);
     Level factoryLevel = Logger.getLogger(FluoFactory.class).getLevel();
     Logger.getLogger(FluoFactory.class).setLevel(Level.FATAL);
 
-    FluoConfiguration config = new FluoConfiguration();
+    FluoConfiguration fluoConfig = new FluoConfiguration();
     try {
-      FluoFactory.newClient(config);
+      FluoFactory.newClient(fluoConfig);
       Assert.fail();
     } catch (FluoException e) {
     }
 
-    try (FluoClientImpl impl = new FluoClientImpl(config)) {
+    try (FluoClientImpl impl = new FluoClientImpl(fluoConfig)) {
       Assert.fail("FluoClientImpl was " + impl);
     } catch (IllegalArgumentException e) {
     }
 
-    config.setApplicationName("test");
-    config.setAccumuloUser("test");
-    config.setAccumuloPassword("test");
-    config.setAccumuloInstance("test");
-    config.setZookeeperTimeout(5);
-
-    try (FluoClientImpl impl = new FluoClientImpl(config)) {
-      Assert.fail("FluoClientImpl was " + impl);
-    } catch (IllegalStateException e) {
-    }
-
     Logger.getLogger(FluoClientImpl.class).setLevel(clientLevel);
     Logger.getLogger(FluoFactory.class).setLevel(factoryLevel);
   }
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index 6d1ce5d..96513b8 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -121,7 +121,7 @@ public class MiniFluoImpl implements MiniFluo {
       config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
 
       // configuration that only needs to be set if not by user
-      if ((config.containsKey(FluoConfiguration.ADMIN_ACCUMULO_TABLE_PROP) == false)
+      if ((config.containsKey(FluoConfiguration.ACCUMULO_TABLE_PROP) == false)
           || config.getAccumuloTable().trim().isEmpty()) {
         config.setAccumuloTable("fluo");
       }
diff --git a/pom.xml b/pom.xml
index 6074d50..fd4f473 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
     <module>modules/api</module>
     <module>modules/cluster</module>
     <module>modules/core</module>
+    <module>modules/command</module>
     <module>modules/distribution</module>
     <module>modules/integration</module>
     <module>modules/mapreduce</module>
@@ -170,6 +171,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.fluo</groupId>
+        <artifactId>fluo-command</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.fluo</groupId>
         <artifactId>fluo-core</artifactId>
         <version>${project.version}</version>
       </dependency>

-- 
To stop receiving notification emails like this one, please contact
['"commits@fluo.apache.org" <co...@fluo.apache.org>'].