You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by kamleshbhatt <gi...@git.apache.org> on 2016/09/07 03:10:25 UTC

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

GitHub user kamleshbhatt opened a pull request:

    https://github.com/apache/storm/pull/1677

    STORM-1985: Admin Commands

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kamleshbhatt/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1677
    
----
commit 5414768a1f03f03efb3b18cf1f13c28a1d661331
Author: kamleshbhatt <kb...@gmail.com>
Date:   2016-08-06T05:35:31Z

    Merge pull request #1 from apache/master
    
    Update

commit a6f807d259382093ed474d89148fa82a90117693
Author: kamleshbhatt <kb...@gmail.com>
Date:   2016-09-06T16:21:09Z

    Merge pull request #2 from apache/master
    
    Update

commit 8ebb8ca96784b565be4659fe0c9e7b19cdc6978e
Author: kamleshbhatt <kb...@gmail.com>
Date:   2016-09-07T02:57:35Z

    Admin Commands

commit 1ab0a0fa49acec384da46b7c3f557b5ed453e289
Author: kamleshbhatt <kb...@gmail.com>
Date:   2016-09-07T02:59:55Z

    Merge branch 'master' of https://github.com/kamleshbhatt/storm

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r81474369
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        BlobStore nimbusBlobStore = Utils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            new RuntimeException(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static Set<String> getKeyListFromId( String corruptId) {
    +        Set<String> keyLists = new HashSet<>();
    +        keyLists.add(ConfigUtils.masterStormCodeKey(corruptId));
    +        keyLists.add(ConfigUtils.masterStormConfKey(corruptId));
    +        if(!ConfigUtils.isLocalMode(conf)) {
    +            ConfigUtils.masterStormJarKey(corruptId);
    +        }
    +        return keyLists;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            String corruptId = corruptTopologies.next();
    +            stormClusterState.removeStorm(corruptId);
    +            if(nimbusBlobStore instanceof LocalFsBlobStore) {
    +                Iterator<String> blobKeys = getKeyListFromId(corruptId).iterator();
    +                while(blobKeys.hasNext()) {
    +                    stormClusterState.removeBlobstoreKey(blobKeys.next());
    +                }
    +            }
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(new KeyFilter<String>() {
    --- End diff --
    
    Thanks for providing these steps. I fixed the issue and updated the pull request for review. Please take a look. 
    However I had to change storm.cmd and storm-config.cmd as well to make it working. Please review these changes as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r80294599
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    --- End diff --
    
    Done. We might have to move initialize method again out of switch block because when we have more commands, repeating it again and again might not be a good idea. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77869729
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            stormClusterState.removeStorm(corruptTopologies.next());
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Iterator<String> blobStoreTopologyIds = clientBlobStore.listKeys();
    --- End diff --
    
    This is very different from the code-ids function
    ```
    (defn code-ids [blob-store]
      (let [to-id (reify KeyFilter
                    (filter [this key] (ConfigUtils/getIdFromBlobKey key)))]
    (set (.filterAndListKeys blob-store to-id))))
    ```
    
    This lists all of the keys in the blob store.  code-ids Converts the key into a topology id.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77871179
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    --- End diff --
    
    I don't really like static global values.  For now this is fine, but if we add in more commands this could become a real mess, and I think it would be a lot cleaner to have an inner class that we instantiate to preform the operation for different sub commands.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    For reference the original code that this is replacing was
    ```
    (defn cleanup-corrupt-topologies! [nimbus]
      (let [storm-cluster-state (:storm-cluster-state nimbus)
            blob-store (:blob-store nimbus)
            code-ids (set (code-ids blob-store))
            active-topologies (set (.active-storms storm-cluster-state))
            corrupt-topologies (set/difference active-topologies code-ids)]
        (doseq [corrupt corrupt-topologies]
          (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
          (.remove-storm! storm-cluster-state corrupt)
          (if (instance? LocalFsBlobStore blob-store)
            (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
              (.remove-blobstore-key! storm-cluster-state blob-key))))))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77868535
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    --- End diff --
    
    This should be `DaemonType.NIMBUS`  We are pretending to be nimbus.  For now it should not make much of a difference, but in the future it might.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    +1 please upmerge though.  There is a very minor merge conflict.  Then ping me and I'll merge it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79898747
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    --- End diff --
    
    Also the Exception itself is surrounded by lots and lots of log messages that make it hard to see what is happening or why a command failed.  Might be nice to print some things out with white space around them to help distinguish what is happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77868697
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    --- End diff --
    
    We were wrapping things in runtime exceptions originally because clojure does not honor/support the Exception declarations.  Here we are pure java so there is no need to wrap things in Runtime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r80296458
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        BlobStore nimbusBlobStore = Utils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            new RuntimeException(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static Set<String> getKeyListFromId( String corruptId) {
    +        Set<String> keyLists = new HashSet<>();
    +        keyLists.add(ConfigUtils.masterStormCodeKey(corruptId));
    +        keyLists.add(ConfigUtils.masterStormConfKey(corruptId));
    +        if(!ConfigUtils.isLocalMode(conf)) {
    +            ConfigUtils.masterStormJarKey(corruptId);
    +        }
    +        return keyLists;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            String corruptId = corruptTopologies.next();
    +            stormClusterState.removeStorm(corruptId);
    +            if(nimbusBlobStore instanceof LocalFsBlobStore) {
    +                Iterator<String> blobKeys = getKeyListFromId(corruptId).iterator();
    +                while(blobKeys.hasNext()) {
    +                    stormClusterState.removeBlobstoreKey(blobKeys.next());
    +                }
    +            }
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(new KeyFilter<String>() {
    --- End diff --
    
    I am unable to test this out. Could you please provide some pointers on how can I setup a  test environment to investigate this issue.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    Looks good to me.  I have a few tests that I want to run, but in general it looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1677


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    I am done going through the code.  Beyond this we need to update storm.py to add in this command, and documentation on how to use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79898972
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        BlobStore nimbusBlobStore = Utils.getNimbusBlobStore (conf, NimbusInfo.fromConf(conf));
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.NIMBUS));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            new RuntimeException(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static Set<String> getKeyListFromId( String corruptId) {
    +        Set<String> keyLists = new HashSet<>();
    +        keyLists.add(ConfigUtils.masterStormCodeKey(corruptId));
    +        keyLists.add(ConfigUtils.masterStormConfKey(corruptId));
    +        if(!ConfigUtils.isLocalMode(conf)) {
    +            ConfigUtils.masterStormJarKey(corruptId);
    +        }
    +        return keyLists;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            String corruptId = corruptTopologies.next();
    +            stormClusterState.removeStorm(corruptId);
    +            if(nimbusBlobStore instanceof LocalFsBlobStore) {
    +                Iterator<String> blobKeys = getKeyListFromId(corruptId).iterator();
    +                while(blobKeys.hasNext()) {
    +                    stormClusterState.removeBlobstoreKey(blobKeys.next());
    +                }
    +            }
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(new KeyFilter<String>() {
    --- End diff --
    
    I am getting an NPE here when I run this.  Not totally sure why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77870908
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            stormClusterState.removeStorm(corruptTopologies.next());
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Iterator<String> blobStoreTopologyIds = clientBlobStore.listKeys();
    +        Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
    +        HashSet<String> blobTopologyIds = Sets.newHashSet(blobStoreTopologyIds);
    +        Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, blobTopologyIds);
    +        LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]",
    +                generateJoinedString(activeTopologyIds), generateJoinedString(blobTopologyIds),
    +                generateJoinedString(diffTopology));
    +        return diffTopology.iterator();
    +    }
    +
    +    private static String generateJoinedString(Set<String> activeTopologyIds) {
    --- End diff --
    
    I'm not sure we need this.  toString of most sets already return this information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79897541
  
    --- Diff: bin/storm.py ---
    @@ -572,6 +572,22 @@ def kill_workers(*args):
             jvmtype="-client",
             extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
     
    +def admin_command(*args):
    --- End diff --
    
    can we rename this just admin, instead of admin_command?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77867791
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    --- End diff --
    
    The original code produced a nimbus blob store, not a client blob store.
    ```
    blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
    ```
    
    The nimbus blob store gives you super user privlages and when nimbus is down it will still work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77870589
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            stormClusterState.removeStorm(corruptTopologies.next());
    +        }
    +    }
    +
    +    private static Iterator<String> listCorruptTopologies() {
    +        Iterator<String> blobStoreTopologyIds = clientBlobStore.listKeys();
    +        Set<String> activeTopologyIds = new HashSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
    --- End diff --
    
    Lets not go to Zookeeper directly this is removing the IStormClusterState abstraction.
    
    ```
            Set<String> activeTopologyIds = new HashSet<>(stormClusterState.activeStorms());
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r80293802
  
    --- Diff: bin/storm.py ---
    @@ -572,6 +572,22 @@ def kill_workers(*args):
             jvmtype="-client",
             extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
     
    +def admin_command(*args):
    +    """Syntax: [storm admin_command cmd]
    +
    +    This is a proxy of nimbus and allow to execute admin commands. As of now it supports
    +    command to remove corrupt topologies.
    +    Nimbus doesn't clean up corrupted topologies automatically. This command should clean
    +    up corrupt topologies i.e.topologies whose codes are not available on blobstore.
    +    In future this command would support more admin commnds.
    +    """
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79897599
  
    --- Diff: bin/storm.py ---
    @@ -572,6 +572,22 @@ def kill_workers(*args):
             jvmtype="-client",
             extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
     
    +def admin_command(*args):
    +    """Syntax: [storm admin_command cmd]
    +
    +    This is a proxy of nimbus and allow to execute admin commands. As of now it supports
    +    command to remove corrupt topologies.
    +    Nimbus doesn't clean up corrupted topologies automatically. This command should clean
    +    up corrupt topologies i.e.topologies whose codes are not available on blobstore.
    +    In future this command would support more admin commnds.
    +    """
    --- End diff --
    
    Can we list the sub commands here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    @kamleshbhatt sorry I lost track of this, looking again now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r77869312
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,114 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static ClientBlobStore clientBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    +                throw new RuntimeException("" + command + " is not a supported admin command");
    +        }
    +
    +    }
    +
    +    private static void initialize() {
    +        Map conf = ConfigUtils.readStormConfig();
    +        ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf);
    +        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
    +        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
    +        List<ACL> acls = null;
    +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
    +            acls = adminZkAcls();
    +        }
    +        try {
    +            IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.UNKNOWN));
    +        } catch (Exception e) {
    +            LOG.error("admin can't create stormClusterState");
    +            throw Utils.wrapInRuntime(e);
    +        }
    +        CuratorFramework zk = Zookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(),conf);
    +    }
    +
    +    // we might think of moving this method in Utils class
    +    private static List<ACL> adminZkAcls() {
    +        final List<ACL> acls = new ArrayList<>();
    +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
    +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
    +        return acls;
    +    }
    +
    +    private static void removeCorruptTopologies( ) {
    +        Iterator<String> corruptTopologies = listCorruptTopologies();
    +        while(corruptTopologies.hasNext()) {
    +            stormClusterState.removeStorm(corruptTopologies.next());
    --- End diff --
    
    In the original code we also deleted the blobs.
    
    ```
          (if (instance? LocalFsBlobStore blob-store)
            (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
              (.remove-blobstore-key! storm-cluster-state blob-key))))))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    @revans2  Thanks for reviewing the changes. Resolved the merge conflict. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79897811
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    --- End diff --
    
    Can we list the sub commands here before throwing the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1677: STORM-1985: Admin Commands

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1677#discussion_r79898829
  
    --- Diff: storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---
    @@ -0,0 +1,133 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.command;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Sets;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.storm.Config;
    +import org.apache.storm.blobstore.BlobStore;
    +import org.apache.storm.blobstore.ClientBlobStore;
    +import org.apache.storm.blobstore.KeyFilter;
    +import org.apache.storm.blobstore.LocalFsBlobStore;
    +import org.apache.storm.callback.DefaultWatcherCallBack;
    +import org.apache.storm.cluster.ClusterStateContext;
    +import org.apache.storm.cluster.ClusterUtils;
    +import org.apache.storm.cluster.DaemonType;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.nimbus.NimbusInfo;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.zookeeper.Zookeeper;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.utils.ConfigUtils;
    +
    +import java.util.*;
    +
    +public class AdminCommands {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Deactivate.class);
    +    private static BlobStore nimbusBlobStore;
    +    private static IStormClusterState stormClusterState;
    +    private static CuratorFramework zk;
    +    private static Map conf;
    +
    +    public static void main(String [] args) throws Exception {
    +
    +        if (args.length == 0) {
    +            throw new IllegalArgumentException("Missing command.");
    +        }
    +        initialize();
    +        String command = args[0];
    +        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
    +        switch (command) {
    +            case "remove_corrupt_topologies":
    +                removeCorruptTopologies();
    +                break;
    +            default:
    --- End diff --
    
    Or not initialize until you know which command you are going to run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1677: STORM-1985: Admin Commands

Posted by kamleshbhatt <gi...@git.apache.org>.
Github user kamleshbhatt commented on the issue:

    https://github.com/apache/storm/pull/1677
  
    Applied the review comments. Please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---