You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/07/01 14:32:05 UTC

incubator-fluo git commit: Fixes #692 - Move YARN specific configuration out of FluoConfiguration

Repository: incubator-fluo
Updated Branches:
  refs/heads/master 734061bd4 -> 31f1c1e6b


Fixes #692 - Move YARN specific configuration out of FluoConfiguration


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/31f1c1e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/31f1c1e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/31f1c1e6

Branch: refs/heads/master
Commit: 31f1c1e6b571bb6f60dee94e30f7dcb7e51f399a
Parents: 734061b
Author: Mike Walch <mw...@gmail.com>
Authored: Fri Jul 1 09:41:58 2016 -0400
Committer: Mike Walch <mw...@gmail.com>
Committed: Fri Jul 1 10:03:35 2016 -0400

----------------------------------------------------------------------
 .../fluo/api/config/FluoConfiguration.java      | 71 --------------------
 .../fluo/api/config/FluoConfigurationTest.java  | 28 ++------
 .../apache/fluo/cluster/runner/AppRunner.java   |  3 +-
 .../fluo/cluster/runner/YarnAppRunner.java      | 17 ++---
 .../fluo/cluster/util/FluoYarnConfig.java       | 67 ++++++++++++++++++
 .../apache/fluo/cluster/yarn/FluoTwillApp.java  | 30 +++++----
 .../src/main/config/fluo.properties             | 30 ++++-----
 7 files changed, 118 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 9022fa5..f00dfca 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -71,13 +71,7 @@ public class FluoConfiguration extends CompositeConfiguration {
   // Worker
   private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
   public static final String WORKER_NUM_THREADS_PROP = WORKER_PREFIX + ".num.threads";
-  public static final String WORKER_INSTANCES_PROP = WORKER_PREFIX + ".instances";
-  public static final String WORKER_MAX_MEMORY_MB_PROP = WORKER_PREFIX + ".max.memory.mb";
-  public static final String WORKER_NUM_CORES_PROP = WORKER_PREFIX + ".num.cores";
   public static final int WORKER_NUM_THREADS_DEFAULT = 10;
-  public static final int WORKER_INSTANCES_DEFAULT = 1;
-  public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
-  public static final int WORKER_NUM_CORES_DEFAULT = 1;
 
   // Loader
   private static final String LOADER_PREFIX = FLUO_PREFIX + ".loader";
@@ -88,12 +82,6 @@ public class FluoConfiguration extends CompositeConfiguration {
 
   // Oracle
   private static final String ORACLE_PREFIX = FLUO_PREFIX + ".oracle";
-  public static final String ORACLE_INSTANCES_PROP = ORACLE_PREFIX + ".instances";
-  public static final String ORACLE_MAX_MEMORY_MB_PROP = ORACLE_PREFIX + ".max.memory.mb";
-  public static final String ORACLE_NUM_CORES_PROP = ORACLE_PREFIX + ".num.cores";
-  public static final int ORACLE_INSTANCES_DEFAULT = 1;
-  public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
-  public static final int ORACLE_NUM_CORES_DEFAULT = 1;
 
   // MiniFluo
   private static final String MINI_PREFIX = FLUO_PREFIX + ".mini";
@@ -169,13 +157,7 @@ public class FluoConfiguration extends CompositeConfiguration {
     getLoaderQueueSize();
     getLoaderThreads();
     getObserverConfig();
-    getOracleInstances();
-    getOracleMaxMemory();
-    getOracleNumCores();
     getTransactionRollbackTime();
-    getWorkerInstances();
-    getWorkerMaxMemory();
-    getWorkerNumCores();
     getWorkerThreads();
     getZookeeperTimeout();
   }
@@ -440,30 +422,6 @@ public class FluoConfiguration extends CompositeConfiguration {
     return getPositiveLong(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
   }
 
-  public FluoConfiguration setWorkerInstances(int workerInstances) {
-    return setPositiveInt(WORKER_INSTANCES_PROP, workerInstances);
-  }
-
-  public int getWorkerInstances() {
-    return getPositiveInt(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
-  }
-
-  public FluoConfiguration setWorkerMaxMemory(int maxMemoryMB) {
-    return setPositiveInt(WORKER_MAX_MEMORY_MB_PROP, maxMemoryMB);
-  }
-
-  public int getWorkerMaxMemory() {
-    return getPositiveInt(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
-  }
-
-  public FluoConfiguration setWorkerNumCores(int numCores) {
-    return setPositiveInt(WORKER_NUM_CORES_PROP, numCores);
-  }
-
-  public int getWorkerNumCores() {
-    return getPositiveInt(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
-  }
-
   public FluoConfiguration setLoaderThreads(int numThreads) {
     return setNonNegativeInt(LOADER_NUM_THREADS_PROP, numThreads);
   }
@@ -480,30 +438,6 @@ public class FluoConfiguration extends CompositeConfiguration {
     return getNonNegativeInt(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
   }
 
-  public FluoConfiguration setOracleMaxMemory(int oracleMaxMemory) {
-    return setPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
-  }
-
-  public int getOracleMaxMemory() {
-    return getPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
-  }
-
-  public FluoConfiguration setOracleInstances(int oracleInstances) {
-    return setPositiveInt(ORACLE_INSTANCES_PROP, oracleInstances);
-  }
-
-  public int getOracleInstances() {
-    return getPositiveInt(ORACLE_INSTANCES_PROP, ORACLE_INSTANCES_DEFAULT);
-  }
-
-  public FluoConfiguration setOracleNumCores(int numCores) {
-    return setPositiveInt(ORACLE_NUM_CORES_PROP, numCores);
-  }
-
-  public int getOracleNumCores() {
-    return getPositiveInt(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
-  }
-
   /**
    * @param reporter The name of the reporter to get configuration for, i.e. console, jmx, graphite.
    * @return A {@link SubsetConfiguration} using the prefix {@value #REPORTER_PREFIX} with the
@@ -672,14 +606,9 @@ public class FluoConfiguration extends CompositeConfiguration {
     config.setProperty(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
     config.setProperty(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
     config.setProperty(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
-    config.setProperty(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
-    config.setProperty(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
-    config.setProperty(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
     config.setProperty(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
     config.setProperty(LOADER_NUM_THREADS_PROP, LOADER_NUM_THREADS_DEFAULT);
     config.setProperty(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
-    config.setProperty(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
-    config.setProperty(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
     config.setProperty(MINI_START_ACCUMULO_PROP, MINI_START_ACCUMULO_DEFAULT);
     config.setProperty(MINI_DATA_DIR_PROP, MINI_DATA_DIR_DEFAULT);
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 01b14e8..f8fb0c5 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -49,16 +49,10 @@ public class FluoConfigurationTest {
     Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
         base.getAccumuloClasspath());
     Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT, base.getWorkerThreads());
-    Assert.assertEquals(FluoConfiguration.WORKER_INSTANCES_DEFAULT, base.getWorkerInstances());
-    Assert.assertEquals(FluoConfiguration.WORKER_MAX_MEMORY_MB_DEFAULT, base.getWorkerMaxMemory());
-    Assert.assertEquals(FluoConfiguration.WORKER_NUM_CORES_DEFAULT, base.getWorkerNumCores());
     Assert.assertEquals(FluoConfiguration.TRANSACTION_ROLLBACK_TIME_DEFAULT,
         base.getTransactionRollbackTime());
     Assert.assertEquals(FluoConfiguration.LOADER_NUM_THREADS_DEFAULT, base.getLoaderThreads());
     Assert.assertEquals(FluoConfiguration.LOADER_QUEUE_SIZE_DEFAULT, base.getLoaderQueueSize());
-    Assert.assertEquals(FluoConfiguration.ORACLE_INSTANCES_DEFAULT, base.getOracleInstances());
-    Assert.assertEquals(FluoConfiguration.ORACLE_MAX_MEMORY_MB_DEFAULT, base.getOracleMaxMemory());
-    Assert.assertEquals(FluoConfiguration.ORACLE_NUM_CORES_DEFAULT, base.getOracleNumCores());
     Assert.assertEquals(FluoConfiguration.MINI_START_ACCUMULO_DEFAULT, base.getMiniStartAccumulo());
     Assert.assertTrue(base.getMiniDataDir().endsWith("/mini"));
   }
@@ -96,18 +90,12 @@ public class FluoConfigurationTest {
     Assert.assertEquals(0, config.setLoaderQueueSize(0).getLoaderQueueSize());
     Assert.assertEquals(7, config.setLoaderThreads(7).getLoaderThreads());
     Assert.assertEquals(0, config.setLoaderThreads(0).getLoaderThreads());
-    Assert.assertEquals(8, config.setOracleMaxMemory(8).getOracleMaxMemory());
-    Assert.assertEquals(10, config.setOracleInstances(10).getOracleInstances());
-    Assert.assertEquals(11, config.setWorkerInstances(11).getWorkerInstances());
-    Assert.assertEquals(12, config.setWorkerMaxMemory(12).getWorkerMaxMemory());
     Assert.assertEquals(13, config.setWorkerThreads(13).getWorkerThreads());
     Assert.assertEquals("zoos1", config.setInstanceZookeepers("zoos1").getInstanceZookeepers());
     Assert.assertEquals("zoos2", config.setAccumuloZookeepers("zoos2").getAccumuloZookeepers());
     Assert.assertEquals("app", config.setApplicationName("app").getApplicationName());
     Assert.assertEquals("zoos1/app", config.getAppZookeepers());
     Assert.assertEquals(14, config.setZookeeperTimeout(14).getZookeeperTimeout());
-    Assert.assertEquals(15, config.setWorkerNumCores(15).getWorkerNumCores());
-    Assert.assertEquals(16, config.setOracleNumCores(16).getOracleNumCores());
     Assert.assertFalse(config.setMiniStartAccumulo(false).getMiniStartAccumulo());
     Assert.assertEquals("mydata", config.setMiniDataDir("mydata").getMiniDataDir());
     Assert.assertEquals(17, config.setClientRetryTimeout(17).getClientRetryTimeout());
@@ -338,22 +326,20 @@ public class FluoConfigurationTest {
   @Test
   public void testCopyConfig() {
     FluoConfiguration c1 = new FluoConfiguration();
-    c1.setOracleNumCores(1);
-    Assert.assertEquals(1, c1.getOracleNumCores());
+    c1.setWorkerThreads(1);
+    Assert.assertEquals(1, c1.getWorkerThreads());
     FluoConfiguration c2 = new FluoConfiguration(c1);
-    Assert.assertEquals(1, c2.getOracleNumCores());
-    c2.setOracleNumCores(2);
-    Assert.assertEquals(2, c2.getOracleNumCores());
-    Assert.assertEquals(1, c1.getOracleNumCores());
+    Assert.assertEquals(1, c2.getWorkerThreads());
+    c2.setWorkerThreads(2);
+    Assert.assertEquals(2, c2.getWorkerThreads());
+    Assert.assertEquals(1, c1.getWorkerThreads());
   }
 
   @Test
   public void testIAE() {
     FluoConfiguration config = new FluoConfiguration();
     String[] positiveIntMethods =
-        {"setLoaderQueueSize", "setLoaderThreads", "setOracleInstances", "setOracleMaxMemory",
-            "setOracleNumCores", "setWorkerInstances", "setWorkerMaxMemory", "setWorkerNumCores",
-            "setWorkerThreads", "setZookeeperTimeout"};
+        {"setLoaderQueueSize", "setLoaderThreads", "setWorkerThreads", "setZookeeperTimeout"};
     for (String methodName : positiveIntMethods) {
       try {
         config.getClass().getMethod(methodName, int.class).invoke(config, -5);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index ac0da34..ca9afe0 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -43,6 +43,7 @@ import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.api.iterator.ColumnIterator;
 import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.util.AccumuloUtil;
@@ -271,7 +272,7 @@ public abstract class AppRunner {
         }
 
         try {
-          long sleepSec = calculateSleep(ntfyCount, config.getWorkerInstances());
+          long sleepSec = calculateSleep(ntfyCount, FluoYarnConfig.getWorkerInstances(config));
           log.info("{} notifications are still outstanding.  Will try again in {} seconds...",
               ntfyCount, sleepSec);
           Thread.sleep(1000 * sleepSec);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index dd1d068..6c52292 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -32,6 +32,7 @@ import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.cluster.runnable.OracleRunnable;
 import org.apache.fluo.cluster.runnable.WorkerRunnable;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
 import org.apache.fluo.cluster.yarn.FluoTwillApp;
 import org.apache.fluo.cluster.yarn.TwillUtil;
 import org.apache.fluo.core.client.FluoAdminImpl;
@@ -331,17 +332,17 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
   }
 
   private boolean allContainersRunning(TwillController controller, FluoConfiguration config) {
-    return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) == config
-        .getOracleInstances()
-        && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) == config
-            .getWorkerInstances();
+    return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) == FluoYarnConfig
+        .getOracleInstances(config)
+        && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) == FluoYarnConfig
+            .getWorkerInstances(config);
   }
 
   private String containerStatus(TwillController controller, FluoConfiguration config) {
     return "" + TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) + " of "
-        + config.getOracleInstances() + " Oracle containers and "
+        + FluoYarnConfig.getOracleInstances(config) + " Oracle containers and "
         + TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) + " of "
-        + config.getWorkerInstances() + " Worker containers";
+        + FluoYarnConfig.getWorkerInstances(config) + " Worker containers";
   }
 
   public void status(FluoConfiguration config, boolean extraInfo) {
@@ -373,12 +374,12 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
         Collection<TwillRunResources> resources;
         resources = report.getRunnableResources(OracleRunnable.ORACLE_NAME);
         System.out.println("\nThe application has " + resources.size() + " of "
-            + config.getOracleInstances() + " desired Oracle containers:\n");
+            + FluoYarnConfig.getOracleInstances(config) + " desired Oracle containers:\n");
         TwillUtil.printResources(resources);
 
         resources = report.getRunnableResources(WorkerRunnable.WORKER_NAME);
         System.out.println("\nThe application has " + resources.size() + " of "
-            + config.getWorkerInstances() + " desired Worker containers:\n");
+            + FluoYarnConfig.getWorkerInstances(config) + " desired Worker containers:\n");
         TwillUtil.printResources(resources);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
new file mode 100644
index 0000000..f078c44
--- /dev/null
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.cluster.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+public class FluoYarnConfig {
+
+  private static final String YARN_PREFIX = FluoConfiguration.FLUO_PREFIX + ".yarn";
+  public static final String WORKER_INSTANCES_PROP = YARN_PREFIX + ".worker.instances";
+  public static final String WORKER_MAX_MEMORY_MB_PROP = YARN_PREFIX + ".worker.max.memory.mb";
+  public static final String WORKER_NUM_CORES_PROP = YARN_PREFIX + ".worker.num.cores";
+  public static final int WORKER_INSTANCES_DEFAULT = 1;
+  public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
+  public static final int WORKER_NUM_CORES_DEFAULT = 1;
+
+  public static final String ORACLE_INSTANCES_PROP = YARN_PREFIX + ".oracle.instances";
+  public static final String ORACLE_MAX_MEMORY_MB_PROP = YARN_PREFIX + ".oracle.max.memory.mb";
+  public static final String ORACLE_NUM_CORES_PROP = YARN_PREFIX + ".oracle.num.cores";
+  public static final int ORACLE_INSTANCES_DEFAULT = 1;
+  public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
+  public static final int ORACLE_NUM_CORES_DEFAULT = 1;
+
+  public static int getWorkerInstances(FluoConfiguration config) {
+    return getPositiveInt(config, WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
+  }
+
+  public static int getWorkerMaxMemory(FluoConfiguration config) {
+    return getPositiveInt(config, WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
+  }
+
+  public static int getWorkerNumCores(FluoConfiguration config) {
+    return getPositiveInt(config, WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
+  }
+
+  public static int getOracleMaxMemory(FluoConfiguration config) {
+    return getPositiveInt(config, ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
+  }
+
+  public static int getOracleInstances(FluoConfiguration config) {
+    return getPositiveInt(config, ORACLE_INSTANCES_PROP, ORACLE_INSTANCES_DEFAULT);
+  }
+
+  public static int getOracleNumCores(FluoConfiguration config) {
+    return getPositiveInt(config, ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
+  }
+
+  private static int getPositiveInt(FluoConfiguration config, String property, int defaultValue) {
+    int value = config.getInt(property, defaultValue);
+    Preconditions.checkArgument(value > 0, property + " must be positive");
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index 352a1b6..02f27fd 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -21,6 +21,7 @@ import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.cluster.runnable.OracleRunnable;
 import org.apache.fluo.cluster.runnable.WorkerRunnable;
 import org.apache.fluo.cluster.runner.YarnAppRunner;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.ResourceSpecification.SizeUnit;
 import org.apache.twill.api.TwillApplication;
@@ -69,14 +70,21 @@ public class FluoTwillApp implements TwillApplication {
   @Override
   public TwillSpecification configure() {
 
+    final int oracleInstances = FluoYarnConfig.getOracleInstances(config);
+    final int oracleMaxMemory = FluoYarnConfig.getOracleMaxMemory(config);
+    final int oracleNumCores = FluoYarnConfig.getOracleNumCores(config);
+    final int workerInstances = FluoYarnConfig.getWorkerInstances(config);
+    final int workerMaxMemory = FluoYarnConfig.getWorkerMaxMemory(config);
+    final int workerNumCores = FluoYarnConfig.getWorkerNumCores(config);
+
     log.info("Configuring Fluo '{}' application with {} Oracle instances and {} Worker instances "
-        + "with following properties:", config.getApplicationName(), config.getOracleInstances(),
-        config.getWorkerInstances());
+        + "with following properties:", config.getApplicationName(), oracleInstances,
+        workerInstances);
 
-    log.info("{} = {}", FluoConfiguration.ORACLE_MAX_MEMORY_MB_PROP, config.getOracleMaxMemory());
-    log.info("{} = {}", FluoConfiguration.WORKER_MAX_MEMORY_MB_PROP, config.getWorkerMaxMemory());
-    log.info("{} = {}", FluoConfiguration.ORACLE_NUM_CORES_PROP, config.getOracleNumCores());
-    log.info("{} = {}", FluoConfiguration.WORKER_NUM_CORES_PROP, config.getWorkerNumCores());
+    log.info("{} = {}", FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
+    log.info("{} = {}", FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP, workerMaxMemory);
+    log.info("{} = {}", FluoYarnConfig.ORACLE_NUM_CORES_PROP, oracleNumCores);
+    log.info("{} = {}", FluoYarnConfig.WORKER_NUM_CORES_PROP, workerNumCores);
 
     // Start building Fluo Twill application
     MoreRunnable moreRunnable =
@@ -86,9 +94,8 @@ public class FluoTwillApp implements TwillApplication {
 
     // Configure Oracle(s)
     ResourceSpecification oracleResources =
-        ResourceSpecification.Builder.with().setVirtualCores(config.getOracleNumCores())
-            .setMemory(config.getOracleMaxMemory(), SizeUnit.MEGA)
-            .setInstances(config.getOracleInstances()).build();
+        ResourceSpecification.Builder.with().setVirtualCores(oracleNumCores)
+            .setMemory(oracleMaxMemory, SizeUnit.MEGA).setInstances(oracleInstances).build();
 
     LocalFileAdder fileAdder =
         moreRunnable.add(OracleRunnable.ORACLE_NAME, new OracleRunnable(), oracleResources)
@@ -97,9 +104,8 @@ public class FluoTwillApp implements TwillApplication {
 
     // Configure Worker(s)
     ResourceSpecification workerResources =
-        ResourceSpecification.Builder.with().setVirtualCores(config.getWorkerNumCores())
-            .setMemory(config.getWorkerMaxMemory(), SizeUnit.MEGA)
-            .setInstances(config.getWorkerInstances()).build();
+        ResourceSpecification.Builder.with().setVirtualCores(workerNumCores)
+            .setMemory(workerMaxMemory, SizeUnit.MEGA).setInstances(workerInstances).build();
 
     fileAdder =
         runnableSetter.add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(), workerResources)

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/distribution/src/main/config/fluo.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties
index 9d4e8a5..7644f4e 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties
@@ -72,18 +72,8 @@ org.apache.fluo.admin.accumulo.classpath=${org.apache.fluo.admin.hdfs.root}/fluo
 
 # Worker properties
 # -----------------
-# Number of worker yarn instances
-#org.apache.fluo.worker.instances=1
 # Number of threads in each worker instance
 #org.apache.fluo.worker.num.threads=10
-# Max memory of worker YARN containers (in MB). If YARN is killing worker processes consider
-# increasing twill.java.reserved.memory.mb (which defaults to 200 and is set in yarn-site.xml).
-# The twill.java.reserved.memory.mb config determines the gap between the YARN memory limit set
-# below and the java -Xmx setting.  For example, if max memory is 1024 and twill reserved memory
-# is 200, the java -Xmx setting will be 1024-200 = 824 MB.
-#org.apache.fluo.worker.max.memory.mb=1024
-# Number of worker virtual cores
-#org.apache.fluo.worker.num.cores=1
 
 # Loader properties
 # -----------------
@@ -94,14 +84,24 @@ org.apache.fluo.admin.accumulo.classpath=${org.apache.fluo.admin.hdfs.root}/fluo
 # Queue size of loader
 #org.apache.fluo.loader.queue.size=10
 
-# Oracle properties
-# -----------------
+# YARN properties
+# ----------------
 # Number of oracle yarn instances
-#org.apache.fluo.oracle.instances=1
+#org.apache.fluo.yarn.oracle.instances=1
 # Max memory of Oracle yarn containers (in MB)
-#org.apache.fluo.oracle.max.memory.mb=512
+#org.apache.fluo.yarn.oracle.max.memory.mb=512
 # Number of oracle virtual cores
-#org.apache.fluo.oracle.num.cores=1
+#org.apache.fluo.yarn.oracle.num.cores=1
+# Number of worker yarn instances
+#org.apache.fluo.yarn.worker.instances=1
+# Max memory of worker YARN containers (in MB). If YARN is killing worker processes consider
+# increasing twill.java.reserved.memory.mb (which defaults to 200 and is set in yarn-site.xml).
+# The twill.java.reserved.memory.mb config determines the gap between the YARN memory limit set
+# below and the java -Xmx setting.  For example, if max memory is 1024 and twill reserved memory
+# is 200, the java -Xmx setting will be 1024-200 = 824 MB.
+#org.apache.fluo.yarn.worker.max.memory.mb=1024
+# Number of worker virtual cores
+#org.apache.fluo.yarn.worker.num.cores=1
 
 #Metrics
 #------------------