You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:08 UTC
[03/50] [abbrv] storm git commit: Local file system based code
distributor initial implementation.
Local file system based code distributor initial implementation.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88e70a81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88e70a81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88e70a81
Branch: refs/heads/0.11.x-branch
Commit: 88e70a81d6e2715059276431d9dd19ec5763786a
Parents: 02c27a9
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Dec 16 17:25:12 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Dec 16 17:25:12 2014 -0800
----------------------------------------------------------------------
.../nimbus/LocalFileSystemCodeDistributor.java | 106 +++++++++++++++++++
1 file changed, 106 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/88e70a81/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
new file mode 100644
index 0000000..a379983
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
@@ -0,0 +1,106 @@
+package backtype.storm.nimbus;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static backtype.storm.Config.*;
+import static backtype.storm.utils.Utils.downloadFromHost;
+import static backtype.storm.utils.Utils.newCurator;
+
+/**
+ * Created by pbrahmbhatt on 12/12/14.
+ */
+public class LocalFileSystemCodeDistributor implements ICodeDistributor {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
+ private CuratorFramework zkClient;
+ private Map conf;
+
+ @Override
+ public void prepare(Map conf) throws Exception {
+ this.conf = conf;
+ List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
+ int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
+ zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT));
+ zkClient.start();
+ //TODO secure zk.
+ }
+
+ @Override
+ public File upload(String dirPath, String topologyId) throws Exception {
+ ArrayList<File> files = new ArrayList<File>();
+ File destDir = new File(dirPath);
+ File[] localFiles = destDir.listFiles();
+
+ List<String> filePaths = new ArrayList<String>(3);
+ for (File file : localFiles) {
+ filePaths.add(file.getAbsolutePath());
+ }
+
+ File metaFile = new File(destDir, "storm-code-distributor.meta");
+ boolean isCreated = metaFile.createNewFile();
+ if (isCreated) {
+ FileUtils.writeLines(metaFile, filePaths);
+ } else {
+ LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
+ }
+
+ LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
+
+ return metaFile;
+ }
+
+ @Override
+ public List<File> download(String topologyid, File metafile) throws Exception {
+ List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
+ File destDir = metafile.getParentFile();
+ List<File> downloadedFiles = Lists.newArrayList();
+ for (String absoluteFilePath : FileUtils.readLines(metafile)) {
+
+ File localFile = new File(destDir, new File(absoluteFilePath).getName());
+
+ boolean isSuccess = false;
+ for (String hostAndPort : hostInfos) {
+ String host = hostAndPort.split(":")[0];
+ int port = Integer.parseInt(hostAndPort.split(":")[1]);
+ try {
+ downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
+ downloadedFiles.add(localFile);
+ isSuccess = true;
+ break;
+ } catch (Exception e) {
+ LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
+ }
+ }
+
+ if(!isSuccess) {
+ throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
+ }
+ }
+
+ return downloadedFiles;
+ }
+
+ @Override
+ public short getReplicationCount(String topologyId) throws Exception {
+ return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
+ }
+
+ @Override
+ public void cleanup(String topologyid) throws IOException {
+ //no op.
+ }
+
+ @Override
+ public void close(Map conf) {
+ zkClient.close();
+ }
+}