You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:08 UTC

[03/50] [abbrv] storm git commit: Local file system based code distributor initial implementation.

Local file system based code distributor initial implementation.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88e70a81
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88e70a81
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88e70a81

Branch: refs/heads/0.11.x-branch
Commit: 88e70a81d6e2715059276431d9dd19ec5763786a
Parents: 02c27a9
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Dec 16 17:25:12 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Dec 16 17:25:12 2014 -0800

----------------------------------------------------------------------
 .../nimbus/LocalFileSystemCodeDistributor.java  | 106 +++++++++++++++++++
 1 file changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88e70a81/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
new file mode 100644
index 0000000..a379983
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java
@@ -0,0 +1,106 @@
+package backtype.storm.nimbus;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static backtype.storm.Config.*;
+import static backtype.storm.utils.Utils.downloadFromHost;
+import static backtype.storm.utils.Utils.newCurator;
+
+/**
+ * Created by pbrahmbhatt on 12/12/14.
+ */
+public class LocalFileSystemCodeDistributor implements ICodeDistributor {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class);
+    private CuratorFramework zkClient;
+    private Map conf;
+
+    @Override
+    public void prepare(Map conf) throws Exception {
+        this.conf = conf;
+        List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS);
+        int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT);
+        zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT));
+        zkClient.start();
+        //TODO secure zk.
+    }
+
+    @Override
+    public File upload(String dirPath, String topologyId) throws Exception {
+        ArrayList<File> files = new ArrayList<File>();
+        File destDir = new File(dirPath);
+        File[] localFiles = destDir.listFiles();
+
+        List<String> filePaths = new ArrayList<String>(3);
+        for (File file : localFiles) {
+            filePaths.add(file.getAbsolutePath());
+        }
+
+        File metaFile = new File(destDir, "storm-code-distributor.meta");
+        boolean isCreated = metaFile.createNewFile();
+        if (isCreated) {
+            FileUtils.writeLines(metaFile, filePaths);
+        } else {
+            LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists.");
+        }
+
+        LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful.");
+
+        return metaFile;
+    }
+
+    @Override
+    public List<File> download(String topologyid, File metafile) throws Exception {
+        List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid);
+        File destDir = metafile.getParentFile();
+        List<File> downloadedFiles = Lists.newArrayList();
+        for (String absoluteFilePath : FileUtils.readLines(metafile)) {
+
+            File localFile = new File(destDir, new File(absoluteFilePath).getName());
+
+            boolean isSuccess = false;
+            for (String hostAndPort : hostInfos) {
+                String host = hostAndPort.split(":")[0];
+                int port = Integer.parseInt(hostAndPort.split(":")[1]);
+                try {
+                    downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port);
+                    downloadedFiles.add(localFile);
+                    isSuccess = true;
+                    break;
+                } catch (Exception e) {
+                    LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e);
+                }
+            }
+
+            if(!isSuccess) {
+                throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint");
+            }
+        }
+
+        return downloadedFiles;
+    }
+
+    @Override
+    public short getReplicationCount(String topologyId) throws Exception {
+        return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size();
+    }
+
+    @Override
+    public void cleanup(String topologyid) throws IOException {
+        //no op.
+    }
+
+    @Override
+    public void close(Map conf) {
+       zkClient.close();
+    }
+}