You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:46 UTC

[6/8] storm git commit: added polling to read from disk every 5 secs

added polling to read from disk every 5 secs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51ae9135
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51ae9135
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51ae9135

Branch: refs/heads/master
Commit: 51ae9135779b25b1014a80021302758c73432cfb
Parents: f5829c2
Author: Sanket <sc...@untilservice-lm>
Authored: Wed Dec 16 12:49:00 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Wed Dec 16 12:49:00 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/51ae9135/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 803b042..250c418 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,18 +125,31 @@ public class BlobStoreAPIWordCountTopology {
     }
 
     public static class FilterWords extends BaseBasicBolt {
-        String fileName = "blacklist.txt";
+        boolean poll = false;
+        long pollTime;
+        Set<String> wordSet;
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            // Thread Polling every 5 seconds to update the wordSet seconds which is
+            // used in FilterWords bolt to filter the words
             try {
-                String word = tuple.getString(0);
-                Set<String> wordSet = parseFile(fileName);
-                if (!wordSet.contains(word)) {
-                    collector.emit(new Values(word));
+                if (!poll) {
+                    wordSet = parseFile(fileName);
+                    pollTime = System.currentTimeMillis();
+                    poll = true;
+                } else {
+                    if ((System.currentTimeMillis() - pollTime) > 5000) {
+                        wordSet = parseFile(fileName);
+                        pollTime = System.currentTimeMillis();
+                    }
                 }
             } catch (IOException exp) {
                 throw new RuntimeException(exp);
             }
+            if (wordSet !=null && !wordSet.contains(word)) {
+                collector.emit(new Values(word));
+            }
         }
 
         @Override
@@ -213,14 +226,16 @@ public class BlobStoreAPIWordCountTopology {
         while (tokens.hasMoreElements()) {
             wordSet.add(tokens.nextToken());
         }
-        LOG.info("parseFile {}", wordSet);
+        LOG.debug("parseFile {}", wordSet);
         return wordSet;
     }
 
     private static StringBuilder readFile(File file) throws IOException {
         String line;
         StringBuilder fileContent = new StringBuilder();
-        // Do not use canonical file name here
+        // Do not use canonical file name here as we are using
+        // symbolic links to read file data and performing atomic move
+        // while updating files
         BufferedReader br = new BufferedReader(new FileReader(file));
         while ((line = br.readLine()) != null) {
             fileContent.append(line);
@@ -237,7 +252,6 @@ public class BlobStoreAPIWordCountTopology {
             file.createNewFile();
         }
         writeToFile(file, getRandomWordSet());
-        LOG.info(readFile(file).toString());
         return file;
     }
 
@@ -266,12 +280,14 @@ public class BlobStoreAPIWordCountTopology {
             File file = createFile(fileName);
             // Creating blob again before launching topology
             createBlobWithContent(key, store, file);
+
             // Blostore launch command with topology blobstore map
             // Here we are giving it a local name so that we can read from the file
             // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
             // storm.starter.BlobStoreAPIWordCountTopology bl -c
             // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
             wc.buildAndLaunchWordCountTopology(args);
+
             // Updating file few times every 5 seconds
             for(int i=0; i<10; i++) {
                 updateBlobWithContent(key, store, updateFile(file));