You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:44 UTC
[4/8] storm git commit: created an alias to read from disk on worker
created an alias to read from disk on worker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b04eb6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b04eb6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b04eb6d8
Branch: refs/heads/master
Commit: b04eb6d834c5337f7bc92790aa0cef54f5ac75c2
Parents: 3a62795
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:20:27 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:20:27 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 61 ++++++++------------
1 file changed, 23 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b04eb6d8/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 5877f52..6f3e441 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -19,7 +19,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
-import backtype.storm.LocalCluster;
import backtype.storm.blobstore.AtomicOutputStream;
import backtype.storm.blobstore.ClientBlobStore;
import backtype.storm.blobstore.InputStreamWithMeta;
@@ -56,7 +55,6 @@ import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -128,16 +126,16 @@ public class BlobStoreAPIWordCountTopology {
public static class FilterWords extends BaseBasicBolt {
String key = "key";
+ String fileName = "blacklist.txt";
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
- ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
String word = tuple.getString(0);
- Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+ Set<String> wordSet = parseFile(fileName);
if (!wordSet.contains(word)) {
collector.emit(new Values(word));
}
- } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+ } catch (IOException exp) {
throw new RuntimeException(exp);
}
}
@@ -157,17 +155,9 @@ public class BlobStoreAPIWordCountTopology {
Config conf = new Config();
conf.setDebug(true);
try {
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
- Thread.sleep(10000);
- cluster.shutdown();
- }
- } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
throw new RuntimeException(exp);
}
}
@@ -176,7 +166,6 @@ public class BlobStoreAPIWordCountTopology {
// storm blobstore create --file blacklist.txt --acl o::rwa key
private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
- // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
String stringBlobACL = "o::rwa";
AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
List<AccessControl> acls = new LinkedList<AccessControl>();
@@ -187,21 +176,6 @@ public class BlobStoreAPIWordCountTopology {
blobStream.close();
}
- // Equivalent read command on command line
- // storm blobstore cat --file blacklist.txt key
- private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
- throws AuthorizationException, KeyNotFoundException, IOException {
- InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
- BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
- StringBuilder blobContent = new StringBuilder();
- String line;
- while ((line = br.readLine()) != null) {
- blobContent.append(line);
- blobContent.append("\n");
- }
- return blobContent;
- }
-
// Equivalent update command on command line
// storm blobstore update --file blacklist.txt key
private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
@@ -230,22 +204,28 @@ public class BlobStoreAPIWordCountTopology {
return randomWordSet;
}
- private static Set<String> parseBlobContent(String content) {
- StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+ private static Set<String> parseFile(String fileName) throws IOException {
+ File file = new File(fileName);
Set<String> wordSet = new HashSet<>();
+ if (!file.exists()) {
+ return wordSet;
+ }
+ StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
while (tokens.hasMoreElements()) {
wordSet.add(tokens.nextToken());
}
+ LOG.info("parseFile {}", wordSet);
return wordSet;
}
private static StringBuilder readFile(File file) throws IOException {
String line;
StringBuilder fileContent = new StringBuilder();
- BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+ // Do not use canonical file name here
+ BufferedReader br = new BufferedReader(new FileReader(file));
while ((line = br.readLine()) != null) {
fileContent.append(line);
- fileContent.append("\n");
+ fileContent.append(System.lineSeparator());
}
return fileContent;
}
@@ -270,11 +250,12 @@ public class BlobStoreAPIWordCountTopology {
// Writing random words to be blacklisted
public static void writeToFile(File file, Set<String> content) throws IOException{
- FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+ FileWriter fw = new FileWriter(file, false);
BufferedWriter bw = new BufferedWriter(fw);
Iterator<String> iter = content.iterator();
while(iter.hasNext()) {
bw.write(iter.next());
+ bw.write(System.lineSeparator());
}
bw.close();
}
@@ -286,7 +267,11 @@ public class BlobStoreAPIWordCountTopology {
File file = createFile(fileName);
// Creating blob again before launching topology
createBlobWithContent(key, store, file);
- // Launching word count topology with blob updates
+ // Blostore launch command with topology blobstore map
+ // Here we are giving it a local name so that we can read from the file
+ // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
+ // storm.starter.BlobStoreAPIWordCountTopology bl -c
+ // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
wc.buildAndLaunchWordCountTopology(args);
// Updating file few times every 5 seconds
for(int i=0; i<10; i++) {