You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:07 UTC

[02/50] [abbrv] storm git commit: Bittorrent binding interface needs to ignore loopback and broadcast interface.

Bittorrent binding interface needs to ignore loopback and broadcast interface.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02c27a9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02c27a9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02c27a9a

Branch: refs/heads/0.11.x-branch
Commit: 02c27a9a84813082fe30e6ffcd52425f85b99176
Parents: fa69f2a
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 15 16:30:21 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 15 16:30:21 2014 -0800

----------------------------------------------------------------------
 .../torrent/BitTorrentCodeDistributor.java      | 79 +++++++++++---------
 1 file changed, 45 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/02c27a9a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
index 4fe45a6..f4512b4 100644
--- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
+++ b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
@@ -9,6 +9,7 @@ import com.turn.ttorrent.client.SharedTorrent;
 import com.turn.ttorrent.common.Torrent;
 import com.turn.ttorrent.tracker.TrackedTorrent;
 import com.turn.ttorrent.tracker.Tracker;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -17,17 +18,14 @@ import java.io.FileOutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.nio.file.CopyOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.net.UnknownHostException;
 import java.util.*;
 
 public class BitTorrentCodeDistributor implements ICodeDistributor {
     private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
     private Tracker tracker;
     private String hostName;
-    private InetAddress host;
+    private InetSocketAddress address;
     private Integer port;
     protected HashMap<String, Client> clients = new HashMap<String, Client>();
     protected Double maxDownload;
@@ -37,18 +35,18 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
     @Override
     public void prepare(Map conf) throws Exception {
         this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
-        this.port = (Integer)conf.get(Config.BITTORRENT_PORT);
-        this.host = InetAddress.getLocalHost();
-        this.maxDownload = (Double)conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
-        this.maxUpload = (Double)conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
-        this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+        this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
+        this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
+        this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
+        this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
 
         LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
 
         LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
-        //using "::" to ensure we bind to all IPV4 and IPV6 network interfaces.
-        InetSocketAddress socketAddr = new InetSocketAddress("::", port);
-        this.tracker = new Tracker(socketAddr);
+        //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
+        this.address = new InetSocketAddress("0.0.0.0", port);
+
+        this.tracker = new Tracker(address);
         LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
         this.tracker.start();
     }
@@ -60,6 +58,8 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
 
         URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
         LOG.info("Creating torrent with announce URL: {}", uri);
+
+        //TODO: why does listing the directory not work?
         ArrayList<File> files = new ArrayList<File>();
         files.add(new File(destDir, "stormjar.jar"));
         files.add(new File(destDir, "stormconf.ser"));
@@ -68,41 +68,42 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
         Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
         File torrentFile = new File(destDir, "storm-code-distributor.meta");
         torrent.save(new FileOutputStream(torrentFile));
-        LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath());
+        LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
         this.tracker.announce(new TrackedTorrent(torrent));
-        LOG.info("Torrent announced to tracker.");
-        Client client = new Client(host, new SharedTorrent(torrent, destDir.getParentFile(), true));
+
+        Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
         this.clients.put(topologyId, client);
         rebalanceRates();
         client.share();
         LOG.info("Seeding torrent...");
 
+        /**
+         * Every time on prepare we need to call tracker.announce for all torrents that
+         * exists in the file system, other wise the tracker will reject any peer request
+         * with unknown torrents. You need to bootstrap trackers.
+         */
         return torrentFile;
     }
 
     @Override
     public List<File> download(String topologyId, File torrentFile) throws Exception {
         LOG.info("Initiating BitTorrent download.");
-        InetAddress netAddr = InetAddress.getLocalHost();
 
-        //TODO: This should be configured, the assumption that the files should be downloaded
-        //in parent folder is probably not best one.
         File destDir = torrentFile.getParentFile();
         LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
         LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
         SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
-
-        Client client = new Client(netAddr, st);
+        Client client = new Client(getInetAddress(), st);
         this.clients.put(topologyId, client);
         rebalanceRates();
         client.share(this.seedDuration);
 
         //TODO: Should have a timeout after which we just fail the supervisor.
-        if(this.seedDuration == 0) {
+        if (this.seedDuration == 0) {
             client.waitForCompletion();
         } else {
             LOG.info("Waiting for seeding to begin...");
-            while(client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR){
+            while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
                 try {
                     Thread.sleep(10);
                 } catch (InterruptedException e) {
@@ -120,9 +121,9 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
          * given folder only and no extra folder needs to be created.
          */
 
-        File srcDir = Paths.get(destDir.getPath(), topologyId).toFile();
-        for(File file : srcDir.listFiles()) {
-            Files.copy(file.toPath(), destDir.toPath().resolve(file.getName()));
+        File srcDir = new File(destDir, topologyId);
+        for (File file : srcDir.listFiles()) {
+            FileUtils.copyFileToDirectory(file, destDir);
             file.delete();
         }
         srcDir.delete();
@@ -130,15 +131,25 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
         return Lists.newArrayList(destDir.listFiles());
     }
 
+    private InetAddress getInetAddress() throws UnknownHostException {
+        for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
+            if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
+                return addr;
+            }
+        }
+
+        throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.")
+    }
+
     @Override
     public short getReplicationCount(String topologyId) {
         Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
-        //TODO: this needs to be tested.
-        for(TrackedTorrent trackedTorrent: trackedTorrents) {
-            if(trackedTorrent.getName().equals(topologyId)) {
+        for (final TrackedTorrent trackedTorrent : trackedTorrents) {
+            if (trackedTorrent.getName().equals(topologyId)) {
                 return Shorts.checkedCast(trackedTorrent.seeders());
             }
         }
+
         LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
         return 0;
     }
@@ -147,7 +158,7 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
     public void cleanup(String topologyId) {
         LOG.info("Stop seeding/tracking for topology {}", topologyId);
         Client client = this.clients.remove(topologyId);
-        if(client != null){
+        if (client != null) {
             Torrent torrent = client.getTorrent();
             client.stop();
             this.tracker.remove(torrent);
@@ -160,21 +171,21 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
         this.tracker.stop();
     }
 
-    private synchronized void rebalanceRates(){
+    private synchronized void rebalanceRates() {
         int clientCount = this.clients.size();
-        if(clientCount > 0){
+        if (clientCount > 0) {
             double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
             double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
             LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
             LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
-            for(Client client : this.clients.values()) {
+            for (Client client : this.clients.values()) {
                 client.setMaxDownloadRate(maxDl);
                 client.setMaxUploadRate(maxUl);
             }
         }
     }
 
-    private static String format(double val){
+    private static String format(double val) {
         return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
     }
 }