You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Bob Tiernay (JIRA)" <ji...@apache.org> on 2015/05/11 00:19:59 UTC
[jira] [Commented] (STORM-812) Create Testing#createLocalCluster
[ https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14537397#comment-14537397 ]
Bob Tiernay commented on STORM-812:
-----------------------------------
The other option is to add a new constructor to {{LocalCluster}} that accepts a {{MkClusterParam}}.
> Create Testing#createLocalCluster
> ---------------------------------
>
> Key: STORM-812
> URL: https://issues.apache.org/jira/browse/STORM-812
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: Bob Tiernay
>
> Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}}. In fact, as far as I can tell this is the only way to specify the number supervisors and number of workers per supervisor. However, this method controls the life-cycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in
> http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
> The workaround required is less than ideal:
> {code}
> @NonNull
> public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
> val daemonConf = new Config();
> daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
> daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
> daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
> val clusterParams = new MkClusterParam();
> clusterParams.setSupervisors(5);
> clusterParams.setPortsPerSupervisor(5);
> clusterParams.setDaemonConf(daemonConf);
> return createLocalCluster(clusterParams);
> }
> /**
> * Hack to override local cluster workers.
> */
> @SneakyThrows
> private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) {
> val reference = new AtomicReference<LocalCluster>();
> val latch = new CountDownLatch(1);
> val thread = new Thread(new Runnable() {
> @Override
> public void run() {
> Testing.withLocalCluster(clusterParams,
> new TestJob() {
> @Override
> @SneakyThrows
> public void run(final ILocalCluster cluster) {
> reference.set((LocalCluster) cluster);
> latch.countDown();
> // Wait forever
> synchronized (this) {
> while (true) {
> this.wait();
> }
> }
> }
> });
> }
> });
> thread.setDaemon(true);
> thread.start();
> latch.await();
> return reference.get();
> }
> }
> {code}
> For greater flexibility, a {{createLocalCluster}} should be created.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)