You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Bob Tiernay (JIRA)" <ji...@apache.org> on 2015/05/11 00:19:59 UTC

[jira] [Commented] (STORM-812) Create Testing#createLocalCluster

    [ https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14537397#comment-14537397 ] 

Bob Tiernay commented on STORM-812:
-----------------------------------

The other option is to add a new constructor to {{LocalCluster}} that accepts a {{MkClusterParam}}.

> Create Testing#createLocalCluster
> ---------------------------------
>
>                 Key: STORM-812
>                 URL: https://issues.apache.org/jira/browse/STORM-812
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Bob Tiernay
>
> Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}}. In fact, as far as I can tell this is the only way to specify the number supervisors and number of workers per supervisor. However, this method controls the life-cycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in 
> http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
> The workaround required is less than ideal:
> {code}
>   @NonNull
>   public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
>     val daemonConf = new Config();
>     daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
>     daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
>     daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
>     val clusterParams = new MkClusterParam();
>     clusterParams.setSupervisors(5);
>     clusterParams.setPortsPerSupervisor(5);
>     clusterParams.setDaemonConf(daemonConf);
>     return createLocalCluster(clusterParams);
>   }
>   /**
>    * Hack to override local cluster workers.
>    */
>   @SneakyThrows
>   private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) {
>     val reference = new AtomicReference<LocalCluster>();
>     val latch = new CountDownLatch(1);
>     val thread = new Thread(new Runnable() {
>       @Override
>       public void run() {
>         Testing.withLocalCluster(clusterParams,
>             new TestJob() {
>               @Override
>               @SneakyThrows
>               public void run(final ILocalCluster cluster) {
>                 reference.set((LocalCluster) cluster);
>                 latch.countDown();
>                 // Wait forever
>                 synchronized (this) {
>                   while (true) {
>                     this.wait();
>                   }
>                 }
>               }
>             });
>       }
>     });
>     thread.setDaemon(true);
>     thread.start();
>     latch.await();
>     return reference.get();
>   }
> }
> {code}
> For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)