You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:17 UTC

[12/50] [abbrv] storm git commit: defaulting to LocalFileSystemCodeDistributor, moving all code distributors to their own packages.

defaulting to LocalFileSystemCodeDistributor, moving all code distributors to their own packages.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/547ed491
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/547ed491
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/547ed491

Branch: refs/heads/0.11.x-branch
Commit: 547ed4916761508040f056c6812e047e04e8db3a
Parents: e5a96f4
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 16:39:25 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 16:39:25 2014 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +-
 .../ha/codedistributor/HDFSCodeDistributor.java |   2 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   2 +
 storm-core/src/jvm/backtype/storm/Config.java   |   2 +-
 .../BitTorrentCodeDistributor.java              | 191 +++++++++++++++++++
 .../storm/codedistributor/ICodeDistributor.java |  57 ++++++
 .../LocalFileSystemCodeDistributor.java         | 105 ++++++++++
 .../backtype/storm/nimbus/ICodeDistributor.java |  57 ------
 .../nimbus/LocalFileSystemCodeDistributor.java  | 106 ----------
 .../torrent/BitTorrentCodeDistributor.java      | 191 -------------------
 10 files changed, 358 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7757b24..07230dc 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -48,7 +48,7 @@ storm.auth.simple-acl.users: []
 storm.auth.simple-acl.users.commands: []
 storm.auth.simple-acl.admins: []
 storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
-storm.codedistributor.class: "backtype.storm.torrent.BitTorrentCodeDistributor"
+storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
 
 ### bittorrent configuration
 bittorrent.port: 6969

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
index 9935ae7..f6736bd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
@@ -1,6 +1,6 @@
 package org.apache.storm.hdfs.ha.codedistributor;
 
-import backtype.storm.nimbus.ICodeDistributor;
+import backtype.storm.codedistributor.ICodeDistributor;
 import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.Validate;

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index fbe8e77..5a3fa12 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -65,6 +65,8 @@
 (defmulti sync-code cluster-mode)
 
 ;;TODO we should try genclass for zkLeaderElector and just set NIMBUS-LEADER-ELECTOR-CLASS in defaults.yaml
+;;TODO we need to pass acls, looks like not posible as leader-latch does not work with ACLS
+;;TODO we need to call .preapare or just get rid of the interface all together.
 (defn mk-leader-elector [conf]
   (if (conf NIMBUS-LEADER-ELECTOR-CLASS)
     (do (log-message "Using custom Leade elector: " (conf NIMBUS-LEADER-ELECTOR-CLASS))

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 16216ea..81046b3 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1257,7 +1257,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class;
 
     /**
-     * Which implementation of {@link backtype.storm.nimbus.ICodeDistributor} should be used by storm for code
+     * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
      * distribution.
      */
     public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
new file mode 100644
index 0000000..190cc5f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
@@ -0,0 +1,191 @@
+package backtype.storm.codedistributor;
+
+import backtype.storm.Config;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Shorts;
+import com.turn.ttorrent.client.Client;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.common.Torrent;
+import com.turn.ttorrent.tracker.TrackedTorrent;
+import com.turn.ttorrent.tracker.Tracker;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.*;
+
+public class BitTorrentCodeDistributor implements ICodeDistributor {
+    private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
+    private Tracker tracker;
+    private String hostName;
+    private InetSocketAddress address;
+    private Integer port;
+    protected HashMap<String, Client> clients = new HashMap<String, Client>();
+    protected Double maxDownload;
+    protected Double maxUpload;
+    private Integer seedDuration;
+
+    @Override
+    public void prepare(Map conf) throws Exception {
+        this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
+        this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
+        this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
+        this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+
+        LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
+
+        LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
+        //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
+        this.address = new InetSocketAddress("0.0.0.0", port);
+
+        this.tracker = new Tracker(address);
+        LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
+        this.tracker.start();
+    }
+
+    @Override
+    public File upload(String dirPath, String topologyId) throws Exception {
+        File destDir = new File(dirPath);
+        LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
+
+        URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
+        LOG.info("Creating torrent with announce URL: {}", uri);
+
+        //TODO: why does listing the directory not work?
+        ArrayList<File> files = new ArrayList<File>();
+        files.add(new File(destDir, "stormjar.jar"));
+        files.add(new File(destDir, "stormconf.ser"));
+        files.add(new File(destDir, "stormcode.ser"));
+
+        Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
+        File torrentFile = new File(destDir, "storm-code-distributor.meta");
+        torrent.save(new FileOutputStream(torrentFile));
+        LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
+        this.tracker.announce(new TrackedTorrent(torrent));
+
+        Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
+        this.clients.put(topologyId, client);
+        rebalanceRates();
+        client.share();
+        LOG.info("Seeding torrent...");
+
+        /**
+         *
+         * TODO: Every time on prepare we need to call tracker.announce for all torrents that
+         * exists in the file system, other wise the tracker will reject any peer request
+         * with unknown torrents. You need to bootstrap trackers.
+         */
+        return torrentFile;
+    }
+
+    @Override
+    public List<File> download(String topologyId, File torrentFile) throws Exception {
+        LOG.info("Initiating BitTorrent download.");
+
+        File destDir = torrentFile.getParentFile();
+        LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
+        LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
+        SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
+        Client client = new Client(getInetAddress(), st);
+        this.clients.put(topologyId, client);
+        rebalanceRates();
+        client.share(this.seedDuration);
+
+        //TODO: Should have a timeout after which we just fail the supervisor.
+        if (this.seedDuration == 0) {
+            client.waitForCompletion();
+        } else {
+            LOG.info("Waiting for seeding to begin...");
+            while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+        LOG.info("BitTorrent download complete.");
+
+        /**
+         * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId)
+         * as the folder name and downloads all the files under that folder. so we need to either download
+         * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes
+         * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/.
+         * Ideally we should be able to specify that the downloaded files must be downloaded under
+         * given folder only and no extra folder needs to be created.
+         */
+
+        File srcDir = new File(destDir, topologyId);
+        for (File file : srcDir.listFiles()) {
+            FileUtils.copyFileToDirectory(file, destDir);
+            file.delete();
+        }
+        srcDir.delete();
+
+        return Lists.newArrayList(destDir.listFiles());
+    }
+
+    private InetAddress getInetAddress() throws UnknownHostException {
+        for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
+            if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
+                return addr;
+            }
+        }
+
+        throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.");
+    }
+
+    @Override
+    public short getReplicationCount(String topologyId) {
+        Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
+        for (final TrackedTorrent trackedTorrent : trackedTorrents) {
+            if (trackedTorrent.getName().equals(topologyId)) {
+                return Shorts.checkedCast(trackedTorrent.seeders());
+            }
+        }
+
+        LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
+        return 0;
+    }
+
+    @Override
+    public void cleanup(String topologyId) {
+        LOG.info("Stop seeding/tracking for topology {}", topologyId);
+        Client client = this.clients.remove(topologyId);
+        if (client != null) {
+            Torrent torrent = client.getTorrent();
+            client.stop();
+            this.tracker.remove(torrent);
+        }
+        rebalanceRates();
+    }
+
+    @Override
+    public void close(Map conf) {
+        this.tracker.stop();
+    }
+
+    private synchronized void rebalanceRates() {
+        int clientCount = this.clients.size();
+        if (clientCount > 0) {
+            double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
+            double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
+            LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
+            LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
+            for (Client client : this.clients.values()) {
+                client.setMaxDownloadRate(maxDl);
+                client.setMaxUploadRate(maxUl);
+            }
+        }
+    }
+
+    private static String format(double val) {
+        return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
new file mode 100644
index 0000000..f536a2a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
@@ -0,0 +1,57 @@
+package backtype.storm.codedistributor;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface responsible to distribute code in the cluster.
+ */
+public interface ICodeDistributor {
+    /**
+     * Prepare this code distributor.
+     * @param conf
+     */
+    void prepare(Map conf) throws Exception;
+
+    /**
+     * This API will perform the actual upload of the code to the distribution implementation.
+     * The API should return a Meta file which should have enough information for downloader
+     * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3
+     * it might have the actual directory where all the code is put.
+     * @param dirPath directory where all the code to be distributed exists.
+     * @param topologyId the topologyId for which the meta file needs to be created.
+     * @return metaFile
+     */
+    File upload(String dirPath, String topologyId) throws Exception;
+
+    /**
+     * Given the topologyId and metafile, download the actual code and return the downloaded file's list.
+     * @param topologyid
+     * @param metafile
+     * @return
+     */
+    List<File> download(String topologyid, File metafile) throws Exception;
+
+    /**
+     * returns number of nodes to which the code is already replicated for the topology.
+     * @param topologyId
+     * @return
+     */
+    short getReplicationCount(String topologyId) throws Exception;
+
+    /**
+     * Performs the cleanup.
+     * @param topologyid
+     */
+    void cleanup(String topologyid) throws IOException;
+
+    /**
+     * Close this distributor.
+     * @param conf
+     */
+    void close(Map conf);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
new file mode 100644
index 0000000..96422e2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
@@ -0,0 +1,105 @@
+package backtype.storm.codedistributor;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static backtype.storm.Config.*;
+import static backtype.storm.utils.Utils.downloadFromHost;
+import static backtype.storm.utils.Utils.newCurator;
+
+
+public class LocalFileSystemCodeDistributor implements ICodeDistributor {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
+    private CuratorFramework zkClient;
+    private Map conf;
+
+    @Override
+    public void prepare(Map conf) throws Exception {
+        this.conf = conf;
+        List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
+        int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
+        ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
+        zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+        zkClient.start();
+    }
+
+    @Override
+    public File upload(String dirPath, String topologyId) throws Exception {
+        ArrayList<File> files = new ArrayList<File>();
+        File destDir = new File(dirPath);
+        File[] localFiles = destDir.listFiles();
+
+        List<String> filePaths = new ArrayList<String>(3);
+        for (File file : localFiles) {
+            filePaths.add(file.getAbsolutePath());
+        }
+
+        File metaFile = new File(destDir, "storm-code-distributor.meta");
+        boolean isCreated = metaFile.createNewFile();
+        if (isCreated) {
+            FileUtils.writeLines(metaFile, filePaths);
+        } else {
+            LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
+        }
+
+        LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
+
+        return metaFile;
+    }
+
+    @Override
+    public List<File> download(String topologyid, File metafile) throws Exception {
+        List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
+        File destDir = metafile.getParentFile();
+        List<File> downloadedFiles = Lists.newArrayList();
+        for (String absoluteFilePath : FileUtils.readLines(metafile)) {
+
+            File localFile = new File(destDir, new File(absoluteFilePath).getName());
+
+            boolean isSuccess = false;
+            for (String hostAndPort : hostInfos) {
+                String host = hostAndPort.split(":")[0];
+                int port = Integer.parseInt(hostAndPort.split(":")[1]);
+                try {
+                    downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
+                    downloadedFiles.add(localFile);
+                    isSuccess = true;
+                    break;
+                } catch (Exception e) {
+                    LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
+                }
+            }
+
+            if(!isSuccess) {
+                throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
+            }
+        }
+
+        return downloadedFiles;
+    }
+
+    @Override
+    public short getReplicationCount(String topologyId) throws Exception {
+        return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
+    }
+
+    @Override
+    public void cleanup(String topologyid) throws IOException {
+        //no op.
+    }
+
+    @Override
+    public void close(Map conf) {
+       zkClient.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
deleted file mode 100644
index 8189179..0000000
--- a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package backtype.storm.nimbus;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Interface responsible to distribute code in the cluster.
- */
-public interface ICodeDistributor {
-    /**
-     * Prepare this code distributor.
-     * @param conf
-     */
-    void prepare(Map conf) throws Exception;
-
-    /**
-     * This API will perform the actual upload of the code to the distribution implementation.
-     * The API should return a Meta file which should have enough information for downloader
-     * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3
-     * it might have the actual directory where all the code is put.
-     * @param dirPath directory where all the code to be distributed exists.
-     * @param topologyId the topologyId for which the meta file needs to be created.
-     * @return metaFile
-     */
-    File upload(String dirPath, String topologyId) throws Exception;
-
-    /**
-     * Given the topologyId and metafile, download the actual code and return the downloaded file's list.
-     * @param topologyid
-     * @param metafile
-     * @return
-     */
-    List<File> download(String topologyid, File metafile) throws Exception;
-
-    /**
-     * returns number of nodes to which the code is already replicated for the topology.
-     * @param topologyId
-     * @return
-     */
-    short getReplicationCount(String topologyId) throws Exception;
-
-    /**
-     * Performs the cleanup.
-     * @param topologyid
-     */
-    void cleanup(String topologyid) throws IOException;
-
-    /**
-     * Close this distributor.
-     * @param conf
-     */
-    void close(Map conf);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
deleted file mode 100644
index bcf0167..0000000
--- a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package backtype.storm.nimbus;
-
-import backtype.storm.Config;
-import backtype.storm.utils.ZookeeperAuthInfo;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static backtype.storm.Config.*;
-import static backtype.storm.utils.Utils.downloadFromHost;
-import static backtype.storm.utils.Utils.newCurator;
-
-
-public class LocalFileSystemCodeDistributor implements ICodeDistributor {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
-    private CuratorFramework zkClient;
-    private Map conf;
-
-    @Override
-    public void prepare(Map conf) throws Exception {
-        this.conf = conf;
-        List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
-        int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
-        ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
-        zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
-        zkClient.start();
-    }
-
-    @Override
-    public File upload(String dirPath, String topologyId) throws Exception {
-        ArrayList<File> files = new ArrayList<File>();
-        File destDir = new File(dirPath);
-        File[] localFiles = destDir.listFiles();
-
-        List<String> filePaths = new ArrayList<String>(3);
-        for (File file : localFiles) {
-            filePaths.add(file.getAbsolutePath());
-        }
-
-        File metaFile = new File(destDir, "storm-code-distributor.meta");
-        boolean isCreated = metaFile.createNewFile();
-        if (isCreated) {
-            FileUtils.writeLines(metaFile, filePaths);
-        } else {
-            LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
-        }
-
-        LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
-
-        return metaFile;
-    }
-
-    @Override
-    public List<File> download(String topologyid, File metafile) throws Exception {
-        List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
-        File destDir = metafile.getParentFile();
-        List<File> downloadedFiles = Lists.newArrayList();
-        for (String absoluteFilePath : FileUtils.readLines(metafile)) {
-
-            File localFile = new File(destDir, new File(absoluteFilePath).getName());
-
-            boolean isSuccess = false;
-            for (String hostAndPort : hostInfos) {
-                String host = hostAndPort.split(":")[0];
-                int port = Integer.parseInt(hostAndPort.split(":")[1]);
-                try {
-                    downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
-                    downloadedFiles.add(localFile);
-                    isSuccess = true;
-                    break;
-                } catch (Exception e) {
-                    LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
-                }
-            }
-
-            if(!isSuccess) {
-                throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
-            }
-        }
-
-        return downloadedFiles;
-    }
-
-    @Override
-    public short getReplicationCount(String topologyId) throws Exception {
-        return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
-    }
-
-    @Override
-    public void cleanup(String topologyid) throws IOException {
-        //no op.
-    }
-
-    @Override
-    public void close(Map conf) {
-       zkClient.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
deleted file mode 100644
index a0d2fc8..0000000
--- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package backtype.storm.torrent;
-
-import backtype.storm.Config;
-import backtype.storm.nimbus.ICodeDistributor;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Shorts;
-import com.turn.ttorrent.client.Client;
-import com.turn.ttorrent.client.SharedTorrent;
-import com.turn.ttorrent.common.Torrent;
-import com.turn.ttorrent.tracker.TrackedTorrent;
-import com.turn.ttorrent.tracker.Tracker;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class BitTorrentCodeDistributor implements ICodeDistributor {
-    private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
-    private Tracker tracker;
-    private String hostName;
-    private InetSocketAddress address;
-    private Integer port;
-    protected HashMap<String, Client> clients = new HashMap<String, Client>();
-    protected Double maxDownload;
-    protected Double maxUpload;
-    private Integer seedDuration;
-
-    @Override
-    public void prepare(Map conf) throws Exception {
-        this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
-        this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
-        this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
-        this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
-        this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
-
-        LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
-
-        LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
-        //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
-        this.address = new InetSocketAddress("0.0.0.0", port);
-
-        this.tracker = new Tracker(address);
-        LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
-        this.tracker.start();
-    }
-
-    @Override
-    public File upload(String dirPath, String topologyId) throws Exception {
-        File destDir = new File(dirPath);
-        LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
-
-        URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
-        LOG.info("Creating torrent with announce URL: {}", uri);
-
-        //TODO: why does listing the directory not work?
-        ArrayList<File> files = new ArrayList<File>();
-        files.add(new File(destDir, "stormjar.jar"));
-        files.add(new File(destDir, "stormconf.ser"));
-        files.add(new File(destDir, "stormcode.ser"));
-
-        Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
-        File torrentFile = new File(destDir, "storm-code-distributor.meta");
-        torrent.save(new FileOutputStream(torrentFile));
-        LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
-        this.tracker.announce(new TrackedTorrent(torrent));
-
-        Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
-        this.clients.put(topologyId, client);
-        rebalanceRates();
-        client.share();
-        LOG.info("Seeding torrent...");
-
-        /**
-         * Every time on prepare we need to call tracker.announce for all torrents that
-         * exists in the file system, other wise the tracker will reject any peer request
-         * with unknown torrents. You need to bootstrap trackers.
-         */
-        return torrentFile;
-    }
-
-    @Override
-    public List<File> download(String topologyId, File torrentFile) throws Exception {
-        LOG.info("Initiating BitTorrent download.");
-
-        File destDir = torrentFile.getParentFile();
-        LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
-        LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
-        SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
-        Client client = new Client(getInetAddress(), st);
-        this.clients.put(topologyId, client);
-        rebalanceRates();
-        client.share(this.seedDuration);
-
-        //TODO: Should have a timeout after which we just fail the supervisor.
-        if (this.seedDuration == 0) {
-            client.waitForCompletion();
-        } else {
-            LOG.info("Waiting for seeding to begin...");
-            while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-        LOG.info("BitTorrent download complete.");
-
-        /**
-         * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId)
-         * as the folder name and downloads all the files under that folder. so we need to either download
-         * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes
-         * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/.
-         * Ideally we should be able to specify that the downloaded files must be downloaded under
-         * given folder only and no extra folder needs to be created.
-         */
-
-        File srcDir = new File(destDir, topologyId);
-        for (File file : srcDir.listFiles()) {
-            FileUtils.copyFileToDirectory(file, destDir);
-            file.delete();
-        }
-        srcDir.delete();
-
-        return Lists.newArrayList(destDir.listFiles());
-    }
-
-    private InetAddress getInetAddress() throws UnknownHostException {
-        for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
-            if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
-                return addr;
-            }
-        }
-
-        throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.");
-    }
-
-    @Override
-    public short getReplicationCount(String topologyId) {
-        Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
-        for (final TrackedTorrent trackedTorrent : trackedTorrents) {
-            if (trackedTorrent.getName().equals(topologyId)) {
-                return Shorts.checkedCast(trackedTorrent.seeders());
-            }
-        }
-
-        LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
-        return 0;
-    }
-
-    @Override
-    public void cleanup(String topologyId) {
-        LOG.info("Stop seeding/tracking for topology {}", topologyId);
-        Client client = this.clients.remove(topologyId);
-        if (client != null) {
-            Torrent torrent = client.getTorrent();
-            client.stop();
-            this.tracker.remove(torrent);
-        }
-        rebalanceRates();
-    }
-
-    @Override
-    public void close(Map conf) {
-        this.tracker.stop();
-    }
-
-    private synchronized void rebalanceRates() {
-        int clientCount = this.clients.size();
-        if (clientCount > 0) {
-            double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
-            double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
-            LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
-            LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
-            for (Client client : this.clients.values()) {
-                client.setMaxDownloadRate(maxDl);
-                client.setMaxUploadRate(maxUl);
-            }
-        }
-    }
-
-    private static String format(double val) {
-        return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
-    }
-}