You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:07 UTC
[02/50] [abbrv] storm git commit: Bittorrent binding interface needs
to ignore loopback and broadcast interface.
Bittorrent binding interface needs to ignore loopback and broadcast interface.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02c27a9a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02c27a9a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02c27a9a
Branch: refs/heads/0.11.x-branch
Commit: 02c27a9a84813082fe30e6ffcd52425f85b99176
Parents: fa69f2a
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 15 16:30:21 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 15 16:30:21 2014 -0800
----------------------------------------------------------------------
.../torrent/BitTorrentCodeDistributor.java | 79 +++++++++++---------
1 file changed, 45 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/02c27a9a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
index 4fe45a6..f4512b4 100644
--- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
+++ b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
@@ -9,6 +9,7 @@ import com.turn.ttorrent.client.SharedTorrent;
import com.turn.ttorrent.common.Torrent;
import com.turn.ttorrent.tracker.TrackedTorrent;
import com.turn.ttorrent.tracker.Tracker;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -17,17 +18,14 @@ import java.io.FileOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.nio.file.CopyOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.net.UnknownHostException;
import java.util.*;
public class BitTorrentCodeDistributor implements ICodeDistributor {
private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
private Tracker tracker;
private String hostName;
- private InetAddress host;
+ private InetSocketAddress address;
private Integer port;
protected HashMap<String, Client> clients = new HashMap<String, Client>();
protected Double maxDownload;
@@ -37,18 +35,18 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
@Override
public void prepare(Map conf) throws Exception {
this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
- this.port = (Integer)conf.get(Config.BITTORRENT_PORT);
- this.host = InetAddress.getLocalHost();
- this.maxDownload = (Double)conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
- this.maxUpload = (Double)conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
- this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+ this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
+ this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
+ this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
+ this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
- //using "::" to ensure we bind to all IPV4 and IPV6 network interfaces.
- InetSocketAddress socketAddr = new InetSocketAddress("::", port);
- this.tracker = new Tracker(socketAddr);
+ //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
+ this.address = new InetSocketAddress("0.0.0.0", port);
+
+ this.tracker = new Tracker(address);
LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
this.tracker.start();
}
@@ -60,6 +58,8 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
LOG.info("Creating torrent with announce URL: {}", uri);
+
+ //TODO: why does listing the directory not work?
ArrayList<File> files = new ArrayList<File>();
files.add(new File(destDir, "stormjar.jar"));
files.add(new File(destDir, "stormconf.ser"));
@@ -68,41 +68,42 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
File torrentFile = new File(destDir, "storm-code-distributor.meta");
torrent.save(new FileOutputStream(torrentFile));
- LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath());
+ LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
this.tracker.announce(new TrackedTorrent(torrent));
- LOG.info("Torrent announced to tracker.");
- Client client = new Client(host, new SharedTorrent(torrent, destDir.getParentFile(), true));
+
+ Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
this.clients.put(topologyId, client);
rebalanceRates();
client.share();
LOG.info("Seeding torrent...");
+ /**
+ * Every time on prepare we need to call tracker.announce for all torrents that
+ * exists in the file system, other wise the tracker will reject any peer request
+ * with unknown torrents. You need to bootstrap trackers.
+ */
return torrentFile;
}
@Override
public List<File> download(String topologyId, File torrentFile) throws Exception {
LOG.info("Initiating BitTorrent download.");
- InetAddress netAddr = InetAddress.getLocalHost();
- //TODO: This should be configured, the assumption that the files should be downloaded
- //in parent folder is probably not best one.
File destDir = torrentFile.getParentFile();
LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
-
- Client client = new Client(netAddr, st);
+ Client client = new Client(getInetAddress(), st);
this.clients.put(topologyId, client);
rebalanceRates();
client.share(this.seedDuration);
//TODO: Should have a timeout after which we just fail the supervisor.
- if(this.seedDuration == 0) {
+ if (this.seedDuration == 0) {
client.waitForCompletion();
} else {
LOG.info("Waiting for seeding to begin...");
- while(client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR){
+ while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -120,9 +121,9 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
* given folder only and no extra folder needs to be created.
*/
- File srcDir = Paths.get(destDir.getPath(), topologyId).toFile();
- for(File file : srcDir.listFiles()) {
- Files.copy(file.toPath(), destDir.toPath().resolve(file.getName()));
+ File srcDir = new File(destDir, topologyId);
+ for (File file : srcDir.listFiles()) {
+ FileUtils.copyFileToDirectory(file, destDir);
file.delete();
}
srcDir.delete();
@@ -130,15 +131,25 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
return Lists.newArrayList(destDir.listFiles());
}
+ private InetAddress getInetAddress() throws UnknownHostException {
+ for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
+ if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
+ return addr;
+ }
+ }
+
+ throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.")
+ }
+
@Override
public short getReplicationCount(String topologyId) {
Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
- //TODO: this needs to be tested.
- for(TrackedTorrent trackedTorrent: trackedTorrents) {
- if(trackedTorrent.getName().equals(topologyId)) {
+ for (final TrackedTorrent trackedTorrent : trackedTorrents) {
+ if (trackedTorrent.getName().equals(topologyId)) {
return Shorts.checkedCast(trackedTorrent.seeders());
}
}
+
LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
return 0;
}
@@ -147,7 +158,7 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
public void cleanup(String topologyId) {
LOG.info("Stop seeding/tracking for topology {}", topologyId);
Client client = this.clients.remove(topologyId);
- if(client != null){
+ if (client != null) {
Torrent torrent = client.getTorrent();
client.stop();
this.tracker.remove(torrent);
@@ -160,21 +171,21 @@ public class BitTorrentCodeDistributor implements ICodeDistributor {
this.tracker.stop();
}
- private synchronized void rebalanceRates(){
+ private synchronized void rebalanceRates() {
int clientCount = this.clients.size();
- if(clientCount > 0){
+ if (clientCount > 0) {
double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
- for(Client client : this.clients.values()) {
+ for (Client client : this.clients.values()) {
client.setMaxDownloadRate(maxDl);
client.setMaxUploadRate(maxUl);
}
}
}
- private static String format(double val){
+ private static String format(double val) {
return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
}
}