You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:44 UTC

[4/8] storm git commit: created an alias to read from disk on worker

created an alias to read from disk on worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b04eb6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b04eb6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b04eb6d8

Branch: refs/heads/master
Commit: b04eb6d834c5337f7bc92790aa0cef54f5ac75c2
Parents: 3a62795
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:20:27 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:20:27 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 61 ++++++++------------
 1 file changed, 23 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b04eb6d8/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 5877f52..6f3e441 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -19,7 +19,6 @@ package storm.starter;
 
 import backtype.storm.Config;
 import backtype.storm.StormSubmitter;
-import backtype.storm.LocalCluster;
 import backtype.storm.blobstore.AtomicOutputStream;
 import backtype.storm.blobstore.ClientBlobStore;
 import backtype.storm.blobstore.InputStreamWithMeta;
@@ -56,7 +55,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -128,16 +126,16 @@ public class BlobStoreAPIWordCountTopology {
 
     public static class FilterWords extends BaseBasicBolt {
         String key = "key";
+        String fileName = "blacklist.txt";
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             try {
-                ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
                 String word = tuple.getString(0);
-                Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+                Set<String> wordSet = parseFile(fileName);
                 if (!wordSet.contains(word)) {
                     collector.emit(new Values(word));
                 }
-            } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+            } catch (IOException exp) {
                 throw new RuntimeException(exp);
             }
         }
@@ -157,17 +155,9 @@ public class BlobStoreAPIWordCountTopology {
         Config conf = new Config();
         conf.setDebug(true);
         try {
-            if (args != null && args.length > 0) {
-                conf.setNumWorkers(3);
-                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-            } else {
-                conf.setMaxTaskParallelism(3);
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology("word-count", conf, builder.createTopology());
-                Thread.sleep(10000);
-                cluster.shutdown();
-            }
-        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
             throw new RuntimeException(exp);
         }
     }
@@ -176,7 +166,6 @@ public class BlobStoreAPIWordCountTopology {
     // storm blobstore create --file blacklist.txt --acl o::rwa key
     private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
             throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
-        // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
         String stringBlobACL = "o::rwa";
         AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
         List<AccessControl> acls = new LinkedList<AccessControl>();
@@ -187,21 +176,6 @@ public class BlobStoreAPIWordCountTopology {
         blobStream.close();
     }
 
-    // Equivalent read command on command line
-    // storm blobstore cat --file blacklist.txt key
-    private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
-            throws AuthorizationException, KeyNotFoundException, IOException {
-        InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
-        BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
-        StringBuilder blobContent = new StringBuilder();
-        String line;
-        while ((line = br.readLine()) != null) {
-            blobContent.append(line);
-            blobContent.append("\n");
-        }
-        return blobContent;
-    }
-
     // Equivalent update command on command line
     // storm blobstore update --file blacklist.txt key
     private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
@@ -230,22 +204,28 @@ public class BlobStoreAPIWordCountTopology {
         return randomWordSet;
     }
 
-    private static Set<String> parseBlobContent(String content) {
-        StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+    private static Set<String> parseFile(String fileName) throws IOException {
+        File file = new File(fileName);
         Set<String> wordSet = new HashSet<>();
+        if (!file.exists()) {
+            return wordSet;
+        }
+        StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
         while (tokens.hasMoreElements()) {
             wordSet.add(tokens.nextToken());
         }
+        LOG.info("parseFile {}", wordSet);
         return wordSet;
     }
 
     private static StringBuilder readFile(File file) throws IOException {
         String line;
         StringBuilder fileContent = new StringBuilder();
-        BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+        // Do not use canonical file name here
+        BufferedReader br = new BufferedReader(new FileReader(file));
         while ((line = br.readLine()) != null) {
             fileContent.append(line);
-            fileContent.append("\n");
+            fileContent.append(System.lineSeparator());
         }
         return fileContent;
     }
@@ -270,11 +250,12 @@ public class BlobStoreAPIWordCountTopology {
 
     // Writing random words to be blacklisted
     public static void writeToFile(File file, Set<String> content) throws IOException{
-        FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+        FileWriter fw = new FileWriter(file, false);
         BufferedWriter bw = new BufferedWriter(fw);
         Iterator<String> iter = content.iterator();
         while(iter.hasNext()) {
             bw.write(iter.next());
+            bw.write(System.lineSeparator());
         }
         bw.close();
     }
@@ -286,7 +267,11 @@ public class BlobStoreAPIWordCountTopology {
             File file = createFile(fileName);
             // Creating blob again before launching topology
             createBlobWithContent(key, store, file);
-            // Launching word count topology with blob updates
+            // Blostore launch command with topology blobstore map
+            // Here we are giving it a local name so that we can read from the file
+            // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
+            // storm.starter.BlobStoreAPIWordCountTopology bl -c
+            // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
             wc.buildAndLaunchWordCountTopology(args);
             // Updating file few times every 5 seconds
             for(int i=0; i<10; i++) {