You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:46 UTC
[6/8] storm git commit: added polling to read from disk every 5 secs
added polling to read from disk every 5 secs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51ae9135
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51ae9135
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51ae9135
Branch: refs/heads/master
Commit: 51ae9135779b25b1014a80021302758c73432cfb
Parents: f5829c2
Author: Sanket <sc...@untilservice-lm>
Authored: Wed Dec 16 12:49:00 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Wed Dec 16 12:49:00 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 32 +++++++++++++++-----
1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/51ae9135/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 803b042..250c418 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,18 +125,31 @@ public class BlobStoreAPIWordCountTopology {
}
public static class FilterWords extends BaseBasicBolt {
- String fileName = "blacklist.txt";
+ boolean poll = false;
+ long pollTime;
+ Set<String> wordSet;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ // Thread Polling every 5 seconds to update the wordSet seconds which is
+ // used in FilterWords bolt to filter the words
try {
- String word = tuple.getString(0);
- Set<String> wordSet = parseFile(fileName);
- if (!wordSet.contains(word)) {
- collector.emit(new Values(word));
+ if (!poll) {
+ wordSet = parseFile(fileName);
+ pollTime = System.currentTimeMillis();
+ poll = true;
+ } else {
+ if ((System.currentTimeMillis() - pollTime) > 5000) {
+ wordSet = parseFile(fileName);
+ pollTime = System.currentTimeMillis();
+ }
}
} catch (IOException exp) {
throw new RuntimeException(exp);
}
+ if (wordSet !=null && !wordSet.contains(word)) {
+ collector.emit(new Values(word));
+ }
}
@Override
@@ -213,14 +226,16 @@ public class BlobStoreAPIWordCountTopology {
while (tokens.hasMoreElements()) {
wordSet.add(tokens.nextToken());
}
- LOG.info("parseFile {}", wordSet);
+ LOG.debug("parseFile {}", wordSet);
return wordSet;
}
private static StringBuilder readFile(File file) throws IOException {
String line;
StringBuilder fileContent = new StringBuilder();
- // Do not use canonical file name here
+ // Do not use canonical file name here as we are using
+ // symbolic links to read file data and performing atomic move
+ // while updating files
BufferedReader br = new BufferedReader(new FileReader(file));
while ((line = br.readLine()) != null) {
fileContent.append(line);
@@ -237,7 +252,6 @@ public class BlobStoreAPIWordCountTopology {
file.createNewFile();
}
writeToFile(file, getRandomWordSet());
- LOG.info(readFile(file).toString());
return file;
}
@@ -266,12 +280,14 @@ public class BlobStoreAPIWordCountTopology {
File file = createFile(fileName);
// Creating blob again before launching topology
createBlobWithContent(key, store, file);
+
// Blostore launch command with topology blobstore map
// Here we are giving it a local name so that we can read from the file
// bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
// storm.starter.BlobStoreAPIWordCountTopology bl -c
// topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
wc.buildAndLaunchWordCountTopology(args);
+
// Updating file few times every 5 seconds
for(int i=0; i<10; i++) {
updateBlobWithContent(key, store, updateFile(file));