You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by mw...@apache.org on 2016/07/01 14:32:05 UTC
incubator-fluo git commit: Fixes #692 - Move YARN specific
configuration out of FluoConfiguration
Repository: incubator-fluo
Updated Branches:
refs/heads/master 734061bd4 -> 31f1c1e6b
Fixes #692 - Move YARN specific configuration out of FluoConfiguration
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/31f1c1e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/31f1c1e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/31f1c1e6
Branch: refs/heads/master
Commit: 31f1c1e6b571bb6f60dee94e30f7dcb7e51f399a
Parents: 734061b
Author: Mike Walch <mw...@gmail.com>
Authored: Fri Jul 1 09:41:58 2016 -0400
Committer: Mike Walch <mw...@gmail.com>
Committed: Fri Jul 1 10:03:35 2016 -0400
----------------------------------------------------------------------
.../fluo/api/config/FluoConfiguration.java | 71 --------------------
.../fluo/api/config/FluoConfigurationTest.java | 28 ++------
.../apache/fluo/cluster/runner/AppRunner.java | 3 +-
.../fluo/cluster/runner/YarnAppRunner.java | 17 ++---
.../fluo/cluster/util/FluoYarnConfig.java | 67 ++++++++++++++++++
.../apache/fluo/cluster/yarn/FluoTwillApp.java | 30 +++++----
.../src/main/config/fluo.properties | 30 ++++-----
7 files changed, 118 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 9022fa5..f00dfca 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -71,13 +71,7 @@ public class FluoConfiguration extends CompositeConfiguration {
// Worker
private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
public static final String WORKER_NUM_THREADS_PROP = WORKER_PREFIX + ".num.threads";
- public static final String WORKER_INSTANCES_PROP = WORKER_PREFIX + ".instances";
- public static final String WORKER_MAX_MEMORY_MB_PROP = WORKER_PREFIX + ".max.memory.mb";
- public static final String WORKER_NUM_CORES_PROP = WORKER_PREFIX + ".num.cores";
public static final int WORKER_NUM_THREADS_DEFAULT = 10;
- public static final int WORKER_INSTANCES_DEFAULT = 1;
- public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
- public static final int WORKER_NUM_CORES_DEFAULT = 1;
// Loader
private static final String LOADER_PREFIX = FLUO_PREFIX + ".loader";
@@ -88,12 +82,6 @@ public class FluoConfiguration extends CompositeConfiguration {
// Oracle
private static final String ORACLE_PREFIX = FLUO_PREFIX + ".oracle";
- public static final String ORACLE_INSTANCES_PROP = ORACLE_PREFIX + ".instances";
- public static final String ORACLE_MAX_MEMORY_MB_PROP = ORACLE_PREFIX + ".max.memory.mb";
- public static final String ORACLE_NUM_CORES_PROP = ORACLE_PREFIX + ".num.cores";
- public static final int ORACLE_INSTANCES_DEFAULT = 1;
- public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
- public static final int ORACLE_NUM_CORES_DEFAULT = 1;
// MiniFluo
private static final String MINI_PREFIX = FLUO_PREFIX + ".mini";
@@ -169,13 +157,7 @@ public class FluoConfiguration extends CompositeConfiguration {
getLoaderQueueSize();
getLoaderThreads();
getObserverConfig();
- getOracleInstances();
- getOracleMaxMemory();
- getOracleNumCores();
getTransactionRollbackTime();
- getWorkerInstances();
- getWorkerMaxMemory();
- getWorkerNumCores();
getWorkerThreads();
getZookeeperTimeout();
}
@@ -440,30 +422,6 @@ public class FluoConfiguration extends CompositeConfiguration {
return getPositiveLong(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
}
- public FluoConfiguration setWorkerInstances(int workerInstances) {
- return setPositiveInt(WORKER_INSTANCES_PROP, workerInstances);
- }
-
- public int getWorkerInstances() {
- return getPositiveInt(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
- }
-
- public FluoConfiguration setWorkerMaxMemory(int maxMemoryMB) {
- return setPositiveInt(WORKER_MAX_MEMORY_MB_PROP, maxMemoryMB);
- }
-
- public int getWorkerMaxMemory() {
- return getPositiveInt(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
- }
-
- public FluoConfiguration setWorkerNumCores(int numCores) {
- return setPositiveInt(WORKER_NUM_CORES_PROP, numCores);
- }
-
- public int getWorkerNumCores() {
- return getPositiveInt(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
- }
-
public FluoConfiguration setLoaderThreads(int numThreads) {
return setNonNegativeInt(LOADER_NUM_THREADS_PROP, numThreads);
}
@@ -480,30 +438,6 @@ public class FluoConfiguration extends CompositeConfiguration {
return getNonNegativeInt(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
}
- public FluoConfiguration setOracleMaxMemory(int oracleMaxMemory) {
- return setPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
- }
-
- public int getOracleMaxMemory() {
- return getPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
- }
-
- public FluoConfiguration setOracleInstances(int oracleInstances) {
- return setPositiveInt(ORACLE_INSTANCES_PROP, oracleInstances);
- }
-
- public int getOracleInstances() {
- return getPositiveInt(ORACLE_INSTANCES_PROP, ORACLE_INSTANCES_DEFAULT);
- }
-
- public FluoConfiguration setOracleNumCores(int numCores) {
- return setPositiveInt(ORACLE_NUM_CORES_PROP, numCores);
- }
-
- public int getOracleNumCores() {
- return getPositiveInt(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
- }
-
/**
* @param reporter The name of the reporter to get configuration for, i.e. console, jmx, graphite.
* @return A {@link SubsetConfiguration} using the prefix {@value #REPORTER_PREFIX} with the
@@ -672,14 +606,9 @@ public class FluoConfiguration extends CompositeConfiguration {
config.setProperty(CLIENT_ZOOKEEPER_TIMEOUT_PROP, CLIENT_ZOOKEEPER_TIMEOUT_DEFAULT);
config.setProperty(CLIENT_ACCUMULO_ZOOKEEPERS_PROP, CLIENT_ACCUMULO_ZOOKEEPERS_DEFAULT);
config.setProperty(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
- config.setProperty(WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
- config.setProperty(WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
- config.setProperty(WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
config.setProperty(TRANSACTION_ROLLBACK_TIME_PROP, TRANSACTION_ROLLBACK_TIME_DEFAULT);
config.setProperty(LOADER_NUM_THREADS_PROP, LOADER_NUM_THREADS_DEFAULT);
config.setProperty(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
- config.setProperty(ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
- config.setProperty(ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
config.setProperty(MINI_START_ACCUMULO_PROP, MINI_START_ACCUMULO_DEFAULT);
config.setProperty(MINI_DATA_DIR_PROP, MINI_DATA_DIR_DEFAULT);
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 01b14e8..f8fb0c5 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -49,16 +49,10 @@ public class FluoConfigurationTest {
Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
base.getAccumuloClasspath());
Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT, base.getWorkerThreads());
- Assert.assertEquals(FluoConfiguration.WORKER_INSTANCES_DEFAULT, base.getWorkerInstances());
- Assert.assertEquals(FluoConfiguration.WORKER_MAX_MEMORY_MB_DEFAULT, base.getWorkerMaxMemory());
- Assert.assertEquals(FluoConfiguration.WORKER_NUM_CORES_DEFAULT, base.getWorkerNumCores());
Assert.assertEquals(FluoConfiguration.TRANSACTION_ROLLBACK_TIME_DEFAULT,
base.getTransactionRollbackTime());
Assert.assertEquals(FluoConfiguration.LOADER_NUM_THREADS_DEFAULT, base.getLoaderThreads());
Assert.assertEquals(FluoConfiguration.LOADER_QUEUE_SIZE_DEFAULT, base.getLoaderQueueSize());
- Assert.assertEquals(FluoConfiguration.ORACLE_INSTANCES_DEFAULT, base.getOracleInstances());
- Assert.assertEquals(FluoConfiguration.ORACLE_MAX_MEMORY_MB_DEFAULT, base.getOracleMaxMemory());
- Assert.assertEquals(FluoConfiguration.ORACLE_NUM_CORES_DEFAULT, base.getOracleNumCores());
Assert.assertEquals(FluoConfiguration.MINI_START_ACCUMULO_DEFAULT, base.getMiniStartAccumulo());
Assert.assertTrue(base.getMiniDataDir().endsWith("/mini"));
}
@@ -96,18 +90,12 @@ public class FluoConfigurationTest {
Assert.assertEquals(0, config.setLoaderQueueSize(0).getLoaderQueueSize());
Assert.assertEquals(7, config.setLoaderThreads(7).getLoaderThreads());
Assert.assertEquals(0, config.setLoaderThreads(0).getLoaderThreads());
- Assert.assertEquals(8, config.setOracleMaxMemory(8).getOracleMaxMemory());
- Assert.assertEquals(10, config.setOracleInstances(10).getOracleInstances());
- Assert.assertEquals(11, config.setWorkerInstances(11).getWorkerInstances());
- Assert.assertEquals(12, config.setWorkerMaxMemory(12).getWorkerMaxMemory());
Assert.assertEquals(13, config.setWorkerThreads(13).getWorkerThreads());
Assert.assertEquals("zoos1", config.setInstanceZookeepers("zoos1").getInstanceZookeepers());
Assert.assertEquals("zoos2", config.setAccumuloZookeepers("zoos2").getAccumuloZookeepers());
Assert.assertEquals("app", config.setApplicationName("app").getApplicationName());
Assert.assertEquals("zoos1/app", config.getAppZookeepers());
Assert.assertEquals(14, config.setZookeeperTimeout(14).getZookeeperTimeout());
- Assert.assertEquals(15, config.setWorkerNumCores(15).getWorkerNumCores());
- Assert.assertEquals(16, config.setOracleNumCores(16).getOracleNumCores());
Assert.assertFalse(config.setMiniStartAccumulo(false).getMiniStartAccumulo());
Assert.assertEquals("mydata", config.setMiniDataDir("mydata").getMiniDataDir());
Assert.assertEquals(17, config.setClientRetryTimeout(17).getClientRetryTimeout());
@@ -338,22 +326,20 @@ public class FluoConfigurationTest {
@Test
public void testCopyConfig() {
FluoConfiguration c1 = new FluoConfiguration();
- c1.setOracleNumCores(1);
- Assert.assertEquals(1, c1.getOracleNumCores());
+ c1.setWorkerThreads(1);
+ Assert.assertEquals(1, c1.getWorkerThreads());
FluoConfiguration c2 = new FluoConfiguration(c1);
- Assert.assertEquals(1, c2.getOracleNumCores());
- c2.setOracleNumCores(2);
- Assert.assertEquals(2, c2.getOracleNumCores());
- Assert.assertEquals(1, c1.getOracleNumCores());
+ Assert.assertEquals(1, c2.getWorkerThreads());
+ c2.setWorkerThreads(2);
+ Assert.assertEquals(2, c2.getWorkerThreads());
+ Assert.assertEquals(1, c1.getWorkerThreads());
}
@Test
public void testIAE() {
FluoConfiguration config = new FluoConfiguration();
String[] positiveIntMethods =
- {"setLoaderQueueSize", "setLoaderThreads", "setOracleInstances", "setOracleMaxMemory",
- "setOracleNumCores", "setWorkerInstances", "setWorkerMaxMemory", "setWorkerNumCores",
- "setWorkerThreads", "setZookeeperTimeout"};
+ {"setLoaderQueueSize", "setLoaderThreads", "setWorkerThreads", "setZookeeperTimeout"};
for (String methodName : positiveIntMethods) {
try {
config.getClass().getMethod(methodName, int.class).invoke(config, -5);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
index ac0da34..ca9afe0 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
@@ -43,6 +43,7 @@ import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.AccumuloUtil;
@@ -271,7 +272,7 @@ public abstract class AppRunner {
}
try {
- long sleepSec = calculateSleep(ntfyCount, config.getWorkerInstances());
+ long sleepSec = calculateSleep(ntfyCount, FluoYarnConfig.getWorkerInstances(config));
log.info("{} notifications are still outstanding. Will try again in {} seconds...",
ntfyCount, sleepSec);
Thread.sleep(1000 * sleepSec);
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index dd1d068..6c52292 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -32,6 +32,7 @@ import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.cluster.runnable.OracleRunnable;
import org.apache.fluo.cluster.runnable.WorkerRunnable;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.fluo.cluster.yarn.FluoTwillApp;
import org.apache.fluo.cluster.yarn.TwillUtil;
import org.apache.fluo.core.client.FluoAdminImpl;
@@ -331,17 +332,17 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
}
private boolean allContainersRunning(TwillController controller, FluoConfiguration config) {
- return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) == config
- .getOracleInstances()
- && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) == config
- .getWorkerInstances();
+ return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) == FluoYarnConfig
+ .getOracleInstances(config)
+ && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) == FluoYarnConfig
+ .getWorkerInstances(config);
}
private String containerStatus(TwillController controller, FluoConfiguration config) {
return "" + TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) + " of "
- + config.getOracleInstances() + " Oracle containers and "
+ + FluoYarnConfig.getOracleInstances(config) + " Oracle containers and "
+ TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) + " of "
- + config.getWorkerInstances() + " Worker containers";
+ + FluoYarnConfig.getWorkerInstances(config) + " Worker containers";
}
public void status(FluoConfiguration config, boolean extraInfo) {
@@ -373,12 +374,12 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
Collection<TwillRunResources> resources;
resources = report.getRunnableResources(OracleRunnable.ORACLE_NAME);
System.out.println("\nThe application has " + resources.size() + " of "
- + config.getOracleInstances() + " desired Oracle containers:\n");
+ + FluoYarnConfig.getOracleInstances(config) + " desired Oracle containers:\n");
TwillUtil.printResources(resources);
resources = report.getRunnableResources(WorkerRunnable.WORKER_NAME);
System.out.println("\nThe application has " + resources.size() + " of "
- + config.getWorkerInstances() + " desired Worker containers:\n");
+ + FluoYarnConfig.getWorkerInstances(config) + " desired Worker containers:\n");
TwillUtil.printResources(resources);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
new file mode 100644
index 0000000..f078c44
--- /dev/null
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.cluster.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+public class FluoYarnConfig {
+
+ private static final String YARN_PREFIX = FluoConfiguration.FLUO_PREFIX + ".yarn";
+ public static final String WORKER_INSTANCES_PROP = YARN_PREFIX + ".worker.instances";
+ public static final String WORKER_MAX_MEMORY_MB_PROP = YARN_PREFIX + ".worker.max.memory.mb";
+ public static final String WORKER_NUM_CORES_PROP = YARN_PREFIX + ".worker.num.cores";
+ public static final int WORKER_INSTANCES_DEFAULT = 1;
+ public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
+ public static final int WORKER_NUM_CORES_DEFAULT = 1;
+
+ public static final String ORACLE_INSTANCES_PROP = YARN_PREFIX + ".oracle.instances";
+ public static final String ORACLE_MAX_MEMORY_MB_PROP = YARN_PREFIX + ".oracle.max.memory.mb";
+ public static final String ORACLE_NUM_CORES_PROP = YARN_PREFIX + ".oracle.num.cores";
+ public static final int ORACLE_INSTANCES_DEFAULT = 1;
+ public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
+ public static final int ORACLE_NUM_CORES_DEFAULT = 1;
+
+ public static int getWorkerInstances(FluoConfiguration config) {
+ return getPositiveInt(config, WORKER_INSTANCES_PROP, WORKER_INSTANCES_DEFAULT);
+ }
+
+ public static int getWorkerMaxMemory(FluoConfiguration config) {
+ return getPositiveInt(config, WORKER_MAX_MEMORY_MB_PROP, WORKER_MAX_MEMORY_MB_DEFAULT);
+ }
+
+ public static int getWorkerNumCores(FluoConfiguration config) {
+ return getPositiveInt(config, WORKER_NUM_CORES_PROP, WORKER_NUM_CORES_DEFAULT);
+ }
+
+ public static int getOracleMaxMemory(FluoConfiguration config) {
+ return getPositiveInt(config, ORACLE_MAX_MEMORY_MB_PROP, ORACLE_MAX_MEMORY_MB_DEFAULT);
+ }
+
+ public static int getOracleInstances(FluoConfiguration config) {
+ return getPositiveInt(config, ORACLE_INSTANCES_PROP, ORACLE_INSTANCES_DEFAULT);
+ }
+
+ public static int getOracleNumCores(FluoConfiguration config) {
+ return getPositiveInt(config, ORACLE_NUM_CORES_PROP, ORACLE_NUM_CORES_DEFAULT);
+ }
+
+ private static int getPositiveInt(FluoConfiguration config, String property, int defaultValue) {
+ int value = config.getInt(property, defaultValue);
+ Preconditions.checkArgument(value > 0, property + " must be positive");
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index 352a1b6..02f27fd 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -21,6 +21,7 @@ import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.cluster.runnable.OracleRunnable;
import org.apache.fluo.cluster.runnable.WorkerRunnable;
import org.apache.fluo.cluster.runner.YarnAppRunner;
+import org.apache.fluo.cluster.util.FluoYarnConfig;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.ResourceSpecification.SizeUnit;
import org.apache.twill.api.TwillApplication;
@@ -69,14 +70,21 @@ public class FluoTwillApp implements TwillApplication {
@Override
public TwillSpecification configure() {
+ final int oracleInstances = FluoYarnConfig.getOracleInstances(config);
+ final int oracleMaxMemory = FluoYarnConfig.getOracleMaxMemory(config);
+ final int oracleNumCores = FluoYarnConfig.getOracleNumCores(config);
+ final int workerInstances = FluoYarnConfig.getWorkerInstances(config);
+ final int workerMaxMemory = FluoYarnConfig.getWorkerMaxMemory(config);
+ final int workerNumCores = FluoYarnConfig.getWorkerNumCores(config);
+
log.info("Configuring Fluo '{}' application with {} Oracle instances and {} Worker instances "
- + "with following properties:", config.getApplicationName(), config.getOracleInstances(),
- config.getWorkerInstances());
+ + "with following properties:", config.getApplicationName(), oracleInstances,
+ workerInstances);
- log.info("{} = {}", FluoConfiguration.ORACLE_MAX_MEMORY_MB_PROP, config.getOracleMaxMemory());
- log.info("{} = {}", FluoConfiguration.WORKER_MAX_MEMORY_MB_PROP, config.getWorkerMaxMemory());
- log.info("{} = {}", FluoConfiguration.ORACLE_NUM_CORES_PROP, config.getOracleNumCores());
- log.info("{} = {}", FluoConfiguration.WORKER_NUM_CORES_PROP, config.getWorkerNumCores());
+ log.info("{} = {}", FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
+ log.info("{} = {}", FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP, workerMaxMemory);
+ log.info("{} = {}", FluoYarnConfig.ORACLE_NUM_CORES_PROP, oracleNumCores);
+ log.info("{} = {}", FluoYarnConfig.WORKER_NUM_CORES_PROP, workerNumCores);
// Start building Fluo Twill application
MoreRunnable moreRunnable =
@@ -86,9 +94,8 @@ public class FluoTwillApp implements TwillApplication {
// Configure Oracle(s)
ResourceSpecification oracleResources =
- ResourceSpecification.Builder.with().setVirtualCores(config.getOracleNumCores())
- .setMemory(config.getOracleMaxMemory(), SizeUnit.MEGA)
- .setInstances(config.getOracleInstances()).build();
+ ResourceSpecification.Builder.with().setVirtualCores(oracleNumCores)
+ .setMemory(oracleMaxMemory, SizeUnit.MEGA).setInstances(oracleInstances).build();
LocalFileAdder fileAdder =
moreRunnable.add(OracleRunnable.ORACLE_NAME, new OracleRunnable(), oracleResources)
@@ -97,9 +104,8 @@ public class FluoTwillApp implements TwillApplication {
// Configure Worker(s)
ResourceSpecification workerResources =
- ResourceSpecification.Builder.with().setVirtualCores(config.getWorkerNumCores())
- .setMemory(config.getWorkerMaxMemory(), SizeUnit.MEGA)
- .setInstances(config.getWorkerInstances()).build();
+ ResourceSpecification.Builder.with().setVirtualCores(workerNumCores)
+ .setMemory(workerMaxMemory, SizeUnit.MEGA).setInstances(workerInstances).build();
fileAdder =
runnableSetter.add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(), workerResources)
http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/31f1c1e6/modules/distribution/src/main/config/fluo.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties
index 9d4e8a5..7644f4e 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties
@@ -72,18 +72,8 @@ org.apache.fluo.admin.accumulo.classpath=${org.apache.fluo.admin.hdfs.root}/fluo
# Worker properties
# -----------------
-# Number of worker yarn instances
-#org.apache.fluo.worker.instances=1
# Number of threads in each worker instance
#org.apache.fluo.worker.num.threads=10
-# Max memory of worker YARN containers (in MB). If YARN is killing worker processes consider
-# increasing twill.java.reserved.memory.mb (which defaults to 200 and is set in yarn-site.xml).
-# The twill.java.reserved.memory.mb config determines the gap between the YARN memory limit set
-# below and the java -Xmx setting. For example, if max memory is 1024 and twill reserved memory
-# is 200, the java -Xmx setting will be 1024-200 = 824 MB.
-#org.apache.fluo.worker.max.memory.mb=1024
-# Number of worker virtual cores
-#org.apache.fluo.worker.num.cores=1
# Loader properties
# -----------------
@@ -94,14 +84,24 @@ org.apache.fluo.admin.accumulo.classpath=${org.apache.fluo.admin.hdfs.root}/fluo
# Queue size of loader
#org.apache.fluo.loader.queue.size=10
-# Oracle properties
-# -----------------
+# YARN properties
+# ----------------
# Number of oracle yarn instances
-#org.apache.fluo.oracle.instances=1
+#org.apache.fluo.yarn.oracle.instances=1
# Max memory of Oracle yarn containers (in MB)
-#org.apache.fluo.oracle.max.memory.mb=512
+#org.apache.fluo.yarn.oracle.max.memory.mb=512
# Number of oracle virtual cores
-#org.apache.fluo.oracle.num.cores=1
+#org.apache.fluo.yarn.oracle.num.cores=1
+# Number of worker yarn instances
+#org.apache.fluo.yarn.worker.instances=1
+# Max memory of worker YARN containers (in MB). If YARN is killing worker processes consider
+# increasing twill.java.reserved.memory.mb (which defaults to 200 and is set in yarn-site.xml).
+# The twill.java.reserved.memory.mb config determines the gap between the YARN memory limit set
+# below and the java -Xmx setting. For example, if max memory is 1024 and twill reserved memory
+# is 200, the java -Xmx setting will be 1024-200 = 824 MB.
+#org.apache.fluo.yarn.worker.max.memory.mb=1024
+# Number of worker virtual cores
+#org.apache.fluo.yarn.worker.num.cores=1
#Metrics
#------------------