You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:51:56 UTC

[11/50] [abbrv] storm git commit: STORM-166: deleting the bittorrent code distributor as the ttorent library does not support Distrubted hash table for trackerless torrents.

STORM-166: deleting the bittorrent code distributor as the ttorent library does not support Distrubted hash table for trackerless torrents.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc24e440
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc24e440
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc24e440

Branch: refs/heads/master
Commit: dc24e440fa98f62e232e7929fd075387e2e56e4e
Parents: 27d6b4c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 19 14:37:38 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 19 14:37:38 2014 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |   6 -
 storm-core/src/jvm/backtype/storm/Config.java   |  26 ---
 .../BitTorrentCodeDistributor.java              | 191 -------------------
 3 files changed, 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e189966..9fd9c32 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -50,11 +50,6 @@ storm.auth.simple-acl.admins: []
 storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
 storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
 
-### bittorrent configuration
-bittorrent.port: 6969
-bittorrent.max.upload.rate: 0.0
-bittorrent.max.download.rate: 0.0
-
 ### nimbus.* configs are for the master
 nimbus.thrift.port: 6627
 nimbus.thrift.threads: 64
@@ -129,7 +124,6 @@ supervisor.monitor.frequency.secs: 3
 #how frequently the supervisor heartbeats to the cluster state (for nimbus)
 supervisor.heartbeat.frequency.secs: 5
 supervisor.enable: true
-supervisor.bittorrent.seed.duration: 0
 supervisor.supervisors: []
 supervisor.supervisors.commands: []
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 9746565..4678177 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1258,32 +1258,6 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class;
 
     /**
-     * Which port the BitTorrent tracker should bind to.
-     */
-    public static final String BITTORRENT_PORT = "bittorrent.port";
-    public static final Object BITTORRENT_PORT_SCHEMA = Number.class;
-
-    /**
-     * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited.
-     */
-    public static final String BITTORRENT_MAX_UPLOAD_RATE = "bittorrent.max.upload.rate";
-    public static final Object BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class;
-
-    /**
-     * Max download rate for topology torrents in kB/sec. 0.0 == unlimited.
-     */
-    public static final String BITTORRENT_MAX_DOWNLOAD_RATE = "bittorrent.max.download.rate";
-    public static final Object BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class;
-
-    /**
-     * Time in seconds that a supervisor should seed after completing a topology torrent download.
-     * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor
-     * should seed indefinitely (until the topology is killed).
-     */
-    public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration";
-    public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class;
-
-    /**
      * Minimum number of nimbus hosts where the code must be replicated before leader nimbus
      * is allowed to perform topology activation tasks like setting up heartbeats/assignments
      * and marking the topology as active. default is 0.

http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
deleted file mode 100644
index 190cc5f..0000000
--- a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package backtype.storm.codedistributor;
-
-import backtype.storm.Config;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Shorts;
-import com.turn.ttorrent.client.Client;
-import com.turn.ttorrent.client.SharedTorrent;
-import com.turn.ttorrent.common.Torrent;
-import com.turn.ttorrent.tracker.TrackedTorrent;
-import com.turn.ttorrent.tracker.Tracker;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class BitTorrentCodeDistributor implements ICodeDistributor {
-    private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
-    private Tracker tracker;
-    private String hostName;
-    private InetSocketAddress address;
-    private Integer port;
-    protected HashMap<String, Client> clients = new HashMap<String, Client>();
-    protected Double maxDownload;
-    protected Double maxUpload;
-    private Integer seedDuration;
-
-    @Override
-    public void prepare(Map conf) throws Exception {
-        this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
-        this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
-        this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
-        this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
-        this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
-
-        LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
-
-        LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
-        //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
-        this.address = new InetSocketAddress("0.0.0.0", port);
-
-        this.tracker = new Tracker(address);
-        LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
-        this.tracker.start();
-    }
-
-    @Override
-    public File upload(String dirPath, String topologyId) throws Exception {
-        File destDir = new File(dirPath);
-        LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
-
-        URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
-        LOG.info("Creating torrent with announce URL: {}", uri);
-
-        //TODO: why does listing the directory not work?
-        ArrayList<File> files = new ArrayList<File>();
-        files.add(new File(destDir, "stormjar.jar"));
-        files.add(new File(destDir, "stormconf.ser"));
-        files.add(new File(destDir, "stormcode.ser"));
-
-        Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
-        File torrentFile = new File(destDir, "storm-code-distributor.meta");
-        torrent.save(new FileOutputStream(torrentFile));
-        LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
-        this.tracker.announce(new TrackedTorrent(torrent));
-
-        Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
-        this.clients.put(topologyId, client);
-        rebalanceRates();
-        client.share();
-        LOG.info("Seeding torrent...");
-
-        /**
-         *
-         * TODO: Every time on prepare we need to call tracker.announce for all torrents that
-         * exists in the file system, other wise the tracker will reject any peer request
-         * with unknown torrents. You need to bootstrap trackers.
-         */
-        return torrentFile;
-    }
-
-    @Override
-    public List<File> download(String topologyId, File torrentFile) throws Exception {
-        LOG.info("Initiating BitTorrent download.");
-
-        File destDir = torrentFile.getParentFile();
-        LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
-        LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
-        SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
-        Client client = new Client(getInetAddress(), st);
-        this.clients.put(topologyId, client);
-        rebalanceRates();
-        client.share(this.seedDuration);
-
-        //TODO: Should have a timeout after which we just fail the supervisor.
-        if (this.seedDuration == 0) {
-            client.waitForCompletion();
-        } else {
-            LOG.info("Waiting for seeding to begin...");
-            while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                }
-            }
-        }
-        LOG.info("BitTorrent download complete.");
-
-        /**
-         * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId)
-         * as the folder name and downloads all the files under that folder. so we need to either download
-         * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes
-         * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/.
-         * Ideally we should be able to specify that the downloaded files must be downloaded under
-         * given folder only and no extra folder needs to be created.
-         */
-
-        File srcDir = new File(destDir, topologyId);
-        for (File file : srcDir.listFiles()) {
-            FileUtils.copyFileToDirectory(file, destDir);
-            file.delete();
-        }
-        srcDir.delete();
-
-        return Lists.newArrayList(destDir.listFiles());
-    }
-
-    private InetAddress getInetAddress() throws UnknownHostException {
-        for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
-            if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
-                return addr;
-            }
-        }
-
-        throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.");
-    }
-
-    @Override
-    public short getReplicationCount(String topologyId) {
-        Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
-        for (final TrackedTorrent trackedTorrent : trackedTorrents) {
-            if (trackedTorrent.getName().equals(topologyId)) {
-                return Shorts.checkedCast(trackedTorrent.seeders());
-            }
-        }
-
-        LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
-        return 0;
-    }
-
-    @Override
-    public void cleanup(String topologyId) {
-        LOG.info("Stop seeding/tracking for topology {}", topologyId);
-        Client client = this.clients.remove(topologyId);
-        if (client != null) {
-            Torrent torrent = client.getTorrent();
-            client.stop();
-            this.tracker.remove(torrent);
-        }
-        rebalanceRates();
-    }
-
-    @Override
-    public void close(Map conf) {
-        this.tracker.stop();
-    }
-
-    private synchronized void rebalanceRates() {
-        int clientCount = this.clients.size();
-        if (clientCount > 0) {
-            double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
-            double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
-            LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
-            LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
-            for (Client client : this.clients.values()) {
-                client.setMaxDownloadRate(maxDl);
-                client.setMaxUploadRate(maxUl);
-            }
-        }
-    }
-
-    private static String format(double val) {
-        return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
-    }
-}