You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:17 UTC
[12/50] [abbrv] storm git commit: defaulting to
LocalFileSystemCodeDistributor,
moving all code distributors to their own packages.
defaulting to LocalFileSystemCodeDistributor, moving all code distributors to their own packages.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/547ed491
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/547ed491
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/547ed491
Branch: refs/heads/0.11.x-branch
Commit: 547ed4916761508040f056c6812e047e04e8db3a
Parents: e5a96f4
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 16:39:25 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 16:39:25 2014 -0800
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
.../ha/codedistributor/HDFSCodeDistributor.java | 2 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 2 +
storm-core/src/jvm/backtype/storm/Config.java | 2 +-
.../BitTorrentCodeDistributor.java | 191 +++++++++++++++++++
.../storm/codedistributor/ICodeDistributor.java | 57 ++++++
.../LocalFileSystemCodeDistributor.java | 105 ++++++++++
.../backtype/storm/nimbus/ICodeDistributor.java | 57 ------
.../nimbus/LocalFileSystemCodeDistributor.java | 106 ----------
.../torrent/BitTorrentCodeDistributor.java | 191 -------------------
10 files changed, 358 insertions(+), 357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 7757b24..07230dc 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -48,7 +48,7 @@ storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
storm.auth.simple-acl.admins: []
storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
-storm.codedistributor.class: "backtype.storm.torrent.BitTorrentCodeDistributor"
+storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
### bittorrent configuration
bittorrent.port: 6969
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
index 9935ae7..f6736bd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java
@@ -1,6 +1,6 @@
package org.apache.storm.hdfs.ha.codedistributor;
-import backtype.storm.nimbus.ICodeDistributor;
+import backtype.storm.codedistributor.ICodeDistributor;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.Validate;
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index fbe8e77..5a3fa12 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -65,6 +65,8 @@
(defmulti sync-code cluster-mode)
;;TODO we should try genclass for zkLeaderElector and just set NIMBUS-LEADER-ELECTOR-CLASS in defaults.yaml
+;;TODO we need to pass acls, looks like not posible as leader-latch does not work with ACLS
+;;TODO we need to call .preapare or just get rid of the interface all together.
(defn mk-leader-elector [conf]
(if (conf NIMBUS-LEADER-ELECTOR-CLASS)
(do (log-message "Using custom Leade elector: " (conf NIMBUS-LEADER-ELECTOR-CLASS))
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 16216ea..81046b3 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1257,7 +1257,7 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class;
/**
- * Which implementation of {@link backtype.storm.nimbus.ICodeDistributor} should be used by storm for code
+ * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
* distribution.
*/
public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
new file mode 100644
index 0000000..190cc5f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java
@@ -0,0 +1,191 @@
+package backtype.storm.codedistributor;
+
+import backtype.storm.Config;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Shorts;
+import com.turn.ttorrent.client.Client;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.common.Torrent;
+import com.turn.ttorrent.tracker.TrackedTorrent;
+import com.turn.ttorrent.tracker.Tracker;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.*;
+
+public class BitTorrentCodeDistributor implements ICodeDistributor {
+ private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
+ private Tracker tracker;
+ private String hostName;
+ private InetSocketAddress address;
+ private Integer port;
+ protected HashMap<String, Client> clients = new HashMap<String, Client>();
+ protected Double maxDownload;
+ protected Double maxUpload;
+ private Integer seedDuration;
+
+ @Override
+ public void prepare(Map conf) throws Exception {
+ this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
+ this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
+ this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
+ this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+
+ LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
+
+ LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
+ //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
+ this.address = new InetSocketAddress("0.0.0.0", port);
+
+ this.tracker = new Tracker(address);
+ LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
+ this.tracker.start();
+ }
+
+ @Override
+ public File upload(String dirPath, String topologyId) throws Exception {
+ File destDir = new File(dirPath);
+ LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
+
+ URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
+ LOG.info("Creating torrent with announce URL: {}", uri);
+
+ //TODO: why does listing the directory not work?
+ ArrayList<File> files = new ArrayList<File>();
+ files.add(new File(destDir, "stormjar.jar"));
+ files.add(new File(destDir, "stormconf.ser"));
+ files.add(new File(destDir, "stormcode.ser"));
+
+ Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
+ File torrentFile = new File(destDir, "storm-code-distributor.meta");
+ torrent.save(new FileOutputStream(torrentFile));
+ LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
+ this.tracker.announce(new TrackedTorrent(torrent));
+
+ Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
+ this.clients.put(topologyId, client);
+ rebalanceRates();
+ client.share();
+ LOG.info("Seeding torrent...");
+
+ /**
+ *
+ * TODO: Every time on prepare we need to call tracker.announce for all torrents that
+ * exists in the file system, other wise the tracker will reject any peer request
+ * with unknown torrents. You need to bootstrap trackers.
+ */
+ return torrentFile;
+ }
+
+ @Override
+ public List<File> download(String topologyId, File torrentFile) throws Exception {
+ LOG.info("Initiating BitTorrent download.");
+
+ File destDir = torrentFile.getParentFile();
+ LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
+ LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
+ SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
+ Client client = new Client(getInetAddress(), st);
+ this.clients.put(topologyId, client);
+ rebalanceRates();
+ client.share(this.seedDuration);
+
+ //TODO: Should have a timeout after which we just fail the supervisor.
+ if (this.seedDuration == 0) {
+ client.waitForCompletion();
+ } else {
+ LOG.info("Waiting for seeding to begin...");
+ while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ LOG.info("BitTorrent download complete.");
+
+ /**
+ * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId)
+ * as the folder name and downloads all the files under that folder. so we need to either download
+ * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes
+ * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/.
+ * Ideally we should be able to specify that the downloaded files must be downloaded under
+ * given folder only and no extra folder needs to be created.
+ */
+
+ File srcDir = new File(destDir, topologyId);
+ for (File file : srcDir.listFiles()) {
+ FileUtils.copyFileToDirectory(file, destDir);
+ file.delete();
+ }
+ srcDir.delete();
+
+ return Lists.newArrayList(destDir.listFiles());
+ }
+
+ private InetAddress getInetAddress() throws UnknownHostException {
+ for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
+ if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
+ return addr;
+ }
+ }
+
+ throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.");
+ }
+
+ @Override
+ public short getReplicationCount(String topologyId) {
+ Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
+ for (final TrackedTorrent trackedTorrent : trackedTorrents) {
+ if (trackedTorrent.getName().equals(topologyId)) {
+ return Shorts.checkedCast(trackedTorrent.seeders());
+ }
+ }
+
+ LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
+ return 0;
+ }
+
+ @Override
+ public void cleanup(String topologyId) {
+ LOG.info("Stop seeding/tracking for topology {}", topologyId);
+ Client client = this.clients.remove(topologyId);
+ if (client != null) {
+ Torrent torrent = client.getTorrent();
+ client.stop();
+ this.tracker.remove(torrent);
+ }
+ rebalanceRates();
+ }
+
+ @Override
+ public void close(Map conf) {
+ this.tracker.stop();
+ }
+
+ private synchronized void rebalanceRates() {
+ int clientCount = this.clients.size();
+ if (clientCount > 0) {
+ double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
+ double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
+ LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
+ LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
+ for (Client client : this.clients.values()) {
+ client.setMaxDownloadRate(maxDl);
+ client.setMaxUploadRate(maxUl);
+ }
+ }
+ }
+
+ private static String format(double val) {
+ return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
new file mode 100644
index 0000000..f536a2a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java
@@ -0,0 +1,57 @@
+package backtype.storm.codedistributor;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface responsible to distribute code in the cluster.
+ */
+public interface ICodeDistributor {
+ /**
+ * Prepare this code distributor.
+ * @param conf
+ */
+ void prepare(Map conf) throws Exception;
+
+ /**
+ * This API will perform the actual upload of the code to the distribution implementation.
+ * The API should return a Meta file which should have enough information for downloader
+ * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3
+ * it might have the actual directory where all the code is put.
+ * @param dirPath directory where all the code to be distributed exists.
+ * @param topologyId the topologyId for which the meta file needs to be created.
+ * @return metaFile
+ */
+ File upload(String dirPath, String topologyId) throws Exception;
+
+ /**
+ * Given the topologyId and metafile, download the actual code and return the downloaded file's list.
+ * @param topologyid
+ * @param metafile
+ * @return
+ */
+ List<File> download(String topologyid, File metafile) throws Exception;
+
+ /**
+ * returns number of nodes to which the code is already replicated for the topology.
+ * @param topologyId
+ * @return
+ */
+ short getReplicationCount(String topologyId) throws Exception;
+
+ /**
+ * Performs the cleanup.
+ * @param topologyid
+ */
+ void cleanup(String topologyid) throws IOException;
+
+ /**
+ * Close this distributor.
+ * @param conf
+ */
+ void close(Map conf);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
new file mode 100644
index 0000000..96422e2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java
@@ -0,0 +1,105 @@
+package backtype.storm.codedistributor;
+
+import backtype.storm.utils.ZookeeperAuthInfo;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static backtype.storm.Config.*;
+import static backtype.storm.utils.Utils.downloadFromHost;
+import static backtype.storm.utils.Utils.newCurator;
+
+
+public class LocalFileSystemCodeDistributor implements ICodeDistributor {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
+ private CuratorFramework zkClient;
+ private Map conf;
+
+ @Override
+ public void prepare(Map conf) throws Exception {
+ this.conf = conf;
+ List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
+ int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
+ ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
+ zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+ zkClient.start();
+ }
+
+ @Override
+ public File upload(String dirPath, String topologyId) throws Exception {
+ ArrayList<File> files = new ArrayList<File>();
+ File destDir = new File(dirPath);
+ File[] localFiles = destDir.listFiles();
+
+ List<String> filePaths = new ArrayList<String>(3);
+ for (File file : localFiles) {
+ filePaths.add(file.getAbsolutePath());
+ }
+
+ File metaFile = new File(destDir, "storm-code-distributor.meta");
+ boolean isCreated = metaFile.createNewFile();
+ if (isCreated) {
+ FileUtils.writeLines(metaFile, filePaths);
+ } else {
+ LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
+ }
+
+ LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
+
+ return metaFile;
+ }
+
+ @Override
+ public List<File> download(String topologyid, File metafile) throws Exception {
+ List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
+ File destDir = metafile.getParentFile();
+ List<File> downloadedFiles = Lists.newArrayList();
+ for (String absoluteFilePath : FileUtils.readLines(metafile)) {
+
+ File localFile = new File(destDir, new File(absoluteFilePath).getName());
+
+ boolean isSuccess = false;
+ for (String hostAndPort : hostInfos) {
+ String host = hostAndPort.split(":")[0];
+ int port = Integer.parseInt(hostAndPort.split(":")[1]);
+ try {
+ downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
+ downloadedFiles.add(localFile);
+ isSuccess = true;
+ break;
+ } catch (Exception e) {
+ LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
+ }
+ }
+
+ if(!isSuccess) {
+ throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
+ }
+ }
+
+ return downloadedFiles;
+ }
+
+ @Override
+ public short getReplicationCount(String topologyId) throws Exception {
+ return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
+ }
+
+ @Override
+ public void cleanup(String topologyid) throws IOException {
+ //no op.
+ }
+
+ @Override
+ public void close(Map conf) {
+ zkClient.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
deleted file mode 100644
index 8189179..0000000
--- a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package backtype.storm.nimbus;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Interface responsible to distribute code in the cluster.
- */
-public interface ICodeDistributor {
- /**
- * Prepare this code distributor.
- * @param conf
- */
- void prepare(Map conf) throws Exception;
-
- /**
- * This API will perform the actual upload of the code to the distribution implementation.
- * The API should return a Meta file which should have enough information for downloader
- * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3
- * it might have the actual directory where all the code is put.
- * @param dirPath directory where all the code to be distributed exists.
- * @param topologyId the topologyId for which the meta file needs to be created.
- * @return metaFile
- */
- File upload(String dirPath, String topologyId) throws Exception;
-
- /**
- * Given the topologyId and metafile, download the actual code and return the downloaded file's list.
- * @param topologyid
- * @param metafile
- * @return
- */
- List<File> download(String topologyid, File metafile) throws Exception;
-
- /**
- * returns number of nodes to which the code is already replicated for the topology.
- * @param topologyId
- * @return
- */
- short getReplicationCount(String topologyId) throws Exception;
-
- /**
- * Performs the cleanup.
- * @param topologyid
- */
- void cleanup(String topologyid) throws IOException;
-
- /**
- * Close this distributor.
- * @param conf
- */
- void close(Map conf);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
deleted file mode 100644
index bcf0167..0000000
--- a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package backtype.storm.nimbus;
-
-import backtype.storm.Config;
-import backtype.storm.utils.ZookeeperAuthInfo;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static backtype.storm.Config.*;
-import static backtype.storm.utils.Utils.downloadFromHost;
-import static backtype.storm.utils.Utils.newCurator;
-
-
-public class LocalFileSystemCodeDistributor implements ICodeDistributor {
- private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
- private CuratorFramework zkClient;
- private Map conf;
-
- @Override
- public void prepare(Map conf) throws Exception {
- this.conf = conf;
- List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
- int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
- ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
- zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo);
- zkClient.start();
- }
-
- @Override
- public File upload(String dirPath, String topologyId) throws Exception {
- ArrayList<File> files = new ArrayList<File>();
- File destDir = new File(dirPath);
- File[] localFiles = destDir.listFiles();
-
- List<String> filePaths = new ArrayList<String>(3);
- for (File file : localFiles) {
- filePaths.add(file.getAbsolutePath());
- }
-
- File metaFile = new File(destDir, "storm-code-distributor.meta");
- boolean isCreated = metaFile.createNewFile();
- if (isCreated) {
- FileUtils.writeLines(metaFile, filePaths);
- } else {
- LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
- }
-
- LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
-
- return metaFile;
- }
-
- @Override
- public List<File> download(String topologyid, File metafile) throws Exception {
- List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
- File destDir = metafile.getParentFile();
- List<File> downloadedFiles = Lists.newArrayList();
- for (String absoluteFilePath : FileUtils.readLines(metafile)) {
-
- File localFile = new File(destDir, new File(absoluteFilePath).getName());
-
- boolean isSuccess = false;
- for (String hostAndPort : hostInfos) {
- String host = hostAndPort.split(":")[0];
- int port = Integer.parseInt(hostAndPort.split(":")[1]);
- try {
- downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
- downloadedFiles.add(localFile);
- isSuccess = true;
- break;
- } catch (Exception e) {
- LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
- }
- }
-
- if(!isSuccess) {
- throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
- }
- }
-
- return downloadedFiles;
- }
-
- @Override
- public short getReplicationCount(String topologyId) throws Exception {
- return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
- }
-
- @Override
- public void cleanup(String topologyid) throws IOException {
- //no op.
- }
-
- @Override
- public void close(Map conf) {
- zkClient.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
deleted file mode 100644
index a0d2fc8..0000000
--- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package backtype.storm.torrent;
-
-import backtype.storm.Config;
-import backtype.storm.nimbus.ICodeDistributor;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Shorts;
-import com.turn.ttorrent.client.Client;
-import com.turn.ttorrent.client.SharedTorrent;
-import com.turn.ttorrent.common.Torrent;
-import com.turn.ttorrent.tracker.TrackedTorrent;
-import com.turn.ttorrent.tracker.Tracker;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class BitTorrentCodeDistributor implements ICodeDistributor {
- private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class);
- private Tracker tracker;
- private String hostName;
- private InetSocketAddress address;
- private Integer port;
- protected HashMap<String, Client> clients = new HashMap<String, Client>();
- protected Double maxDownload;
- protected Double maxUpload;
- private Integer seedDuration;
-
- @Override
- public void prepare(Map conf) throws Exception {
- this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
- this.port = (Integer) conf.get(Config.BITTORRENT_PORT);
- this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE);
- this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE);
- this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
-
- LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload));
-
- LOG.info("Starting bt tracker bound to hostname '{}'", hostName);
- //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces.
- this.address = new InetSocketAddress("0.0.0.0", port);
-
- this.tracker = new Tracker(address);
- LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl());
- this.tracker.start();
- }
-
- @Override
- public File upload(String dirPath, String topologyId) throws Exception {
- File destDir = new File(dirPath);
- LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath());
-
- URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce");
- LOG.info("Creating torrent with announce URL: {}", uri);
-
- //TODO: why does listing the directory not work?
- ArrayList<File> files = new ArrayList<File>();
- files.add(new File(destDir, "stormjar.jar"));
- files.add(new File(destDir, "stormconf.ser"));
- files.add(new File(destDir, "stormcode.ser"));
-
- Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus");
- File torrentFile = new File(destDir, "storm-code-distributor.meta");
- torrent.save(new FileOutputStream(torrentFile));
- LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath());
- this.tracker.announce(new TrackedTorrent(torrent));
-
- Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true));
- this.clients.put(topologyId, client);
- rebalanceRates();
- client.share();
- LOG.info("Seeding torrent...");
-
- /**
- * Every time on prepare we need to call tracker.announce for all torrents that
- * exists in the file system, other wise the tracker will reject any peer request
- * with unknown torrents. You need to bootstrap trackers.
- */
- return torrentFile;
- }
-
- @Override
- public List<File> download(String topologyId, File torrentFile) throws Exception {
- LOG.info("Initiating BitTorrent download.");
-
- File destDir = torrentFile.getParentFile();
- LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath());
- LOG.info("Saving files to directory: {}", destDir.getAbsolutePath());
- SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
- Client client = new Client(getInetAddress(), st);
- this.clients.put(topologyId, client);
- rebalanceRates();
- client.share(this.seedDuration);
-
- //TODO: Should have a timeout after which we just fail the supervisor.
- if (this.seedDuration == 0) {
- client.waitForCompletion();
- } else {
- LOG.info("Waiting for seeding to begin...");
- while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- }
- }
- }
- LOG.info("BitTorrent download complete.");
-
- /**
- * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId)
- * as the folder name and downloads all the files under that folder. so we need to either download
- * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes
- * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/.
- * Ideally we should be able to specify that the downloaded files must be downloaded under
- * given folder only and no extra folder needs to be created.
- */
-
- File srcDir = new File(destDir, topologyId);
- for (File file : srcDir.listFiles()) {
- FileUtils.copyFileToDirectory(file, destDir);
- file.delete();
- }
- srcDir.delete();
-
- return Lists.newArrayList(destDir.listFiles());
- }
-
- private InetAddress getInetAddress() throws UnknownHostException {
- for (InetAddress addr : InetAddress.getAllByName(this.hostName)) {
- if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) {
- return addr;
- }
- }
-
- throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.");
- }
-
- @Override
- public short getReplicationCount(String topologyId) {
- Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents();
- for (final TrackedTorrent trackedTorrent : trackedTorrents) {
- if (trackedTorrent.getName().equals(topologyId)) {
- return Shorts.checkedCast(trackedTorrent.seeders());
- }
- }
-
- LOG.warn("No torrent found in tracker for topologyId = " + topologyId);
- return 0;
- }
-
- @Override
- public void cleanup(String topologyId) {
- LOG.info("Stop seeding/tracking for topology {}", topologyId);
- Client client = this.clients.remove(topologyId);
- if (client != null) {
- Torrent torrent = client.getTorrent();
- client.stop();
- this.tracker.remove(torrent);
- }
- rebalanceRates();
- }
-
- @Override
- public void close(Map conf) {
- this.tracker.stop();
- }
-
- private synchronized void rebalanceRates() {
- int clientCount = this.clients.size();
- if (clientCount > 0) {
- double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount;
- double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount;
- LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount);
- LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl));
- for (Client client : this.clients.values()) {
- client.setMaxDownloadRate(maxDl);
- client.setMaxUploadRate(maxUl);
- }
- }
- }
-
- private static String format(double val) {
- return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val);
- }
-}