You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:41 UTC
[1/8] storm git commit: blobstore api example
Repository: storm
Updated Branches:
refs/heads/master 57d7cf6e5 -> 59a960552
blobstore api example
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7c972e46
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7c972e46
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7c972e46
Branch: refs/heads/master
Commit: 7c972e460fc171dcfeb045957cd7e56af6d198bc
Parents: 19b8b7d
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 12:01:28 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 12:01:28 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 248 +++++++++++++++++++
.../jvm/storm/starter/WordCountTopology.java | 1 -
2 files changed, 248 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7c972e46/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
new file mode 100644
index 0000000..dda038d
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.LocalCluster;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.blobstore.NimbusBlobStore;
+
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class BlobStoreAPIWordCountTopology {
+ private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
+ private static String key = "key1";
+ private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
+ private static final int READ = 0x01;
+ private static final int WRITE = 0x02;
+ private static final int ADMIN = 0x04;
+ private static final List<AccessControl> WORLD_EVERYTHING =
+ Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+
+ // Spout implementation
+ public static class RandomSentenceSpout extends BaseRichSpout {
+ SpoutOutputCollector _collector;
+ BlobStoreAPIWordCountTopology wc;
+ String key;
+ NimbusBlobStore store;
+
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ wc = new BlobStoreAPIWordCountTopology();
+ key = "key1";
+ store = new NimbusBlobStore();
+ store.prepare(Utils.readStormConfig());
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(100);
+ try {
+ _collector.emit(new Values(wc.getBlobContent(key, store)));
+ } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+ throw new RuntimeException(exp);
+ }
+ }
+
+ @Override
+ public void ack(Object id) {
+ }
+
+ @Override
+ public void fail(Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ }
+
+ // Bolt implementation
+ public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+ public SplitSentence() {
+ super("python", "splitsentence.py");
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+ }
+
+ public static class WordCount extends BaseBasicBolt {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ Integer count = counts.get(word);
+ if (count == null)
+ count = 0;
+ count++;
+ counts.put(word, count);
+ collector.emit(new Values(word, count));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word", "count"));
+ }
+ }
+
+ public void buildAndLaunchWordCountTopology(String[] args) {
+
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+ builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+ builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ try {
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } else {
+ conf.setMaxTaskParallelism(3);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, builder.createTopology());
+
+ Thread.sleep(10000);
+
+ cluster.shutdown();
+ }
+ } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+ throw new RuntimeException(exp);
+ }
+ }
+
+ private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, SettableBlobMeta settableBlobMeta)
+ throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
+ AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
+ blobStream.write(getRandomSentence().getBytes());
+ blobStream.close();
+ }
+
+ private static String getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
+ BufferedReader r = new BufferedReader(new InputStreamReader(blobInputStream));
+ return r.readLine();
+ }
+
+ private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore)
+ throws KeyNotFoundException, AuthorizationException, IOException {
+ AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
+ blobOutputStream.write(getRandomSentence().getBytes());
+ blobOutputStream.close();
+ }
+
+ private static String getRandomSentence() {
+ String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+ "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+ String sentence = sentences[new Random().nextInt(sentences.length)];
+ return sentence;
+ }
+
+ private void prepareAndPerformBasicOperations()
+ throws KeyAlreadyExistsException, AuthorizationException, KeyNotFoundException, IOException{
+ Map conf = Utils.readStormConfig();
+ store.prepare(conf);
+ // create blob with content
+ createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
+ // read blob content
+ LOG.info(getBlobContent(key, store));
+ // update blob
+ updateBlobWithContent(key, store);
+ LOG.info(getBlobContent(key, store));
+ // delete blob through API
+ store.deleteBlob(key);
+ }
+
+ public static void main(String[] args) {
+ // Basic blob store API calls create, read, update and delete
+ BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
+ try {
+ wc.prepareAndPerformBasicOperations();
+ // Creating blob again before launching topology
+ createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
+ LOG.info(getBlobContent(key, store));
+ // Launching word count topology with blob updates
+ wc.buildAndLaunchWordCountTopology(args);
+ // Updating blobs few times
+ for (int i = 0; i < 10; i++) {
+ updateBlobWithContent(key, store);
+ Utils.sleep(100);
+ }
+ } catch (KeyAlreadyExistsException kae) {
+ // Do nothing
+ } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+ throw new RuntimeException(exp);
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/7c972e46/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 39184da..7260beb 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -87,7 +87,6 @@ public class WordCountTopology {
Config conf = new Config();
conf.setDebug(true);
-
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
[8/8] storm git commit: Added STORM-1373 to Changelog
Posted by bo...@apache.org.
Added STORM-1373 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59a96055
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59a96055
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59a96055
Branch: refs/heads/master
Commit: 59a9605520de54930fd9ba008b6594f98f2eb63c
Parents: cb6e923
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 4 13:28:44 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 4 13:28:44 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/59a96055/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a5ef7c2..bb3717b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1373: Blobstore API sample example usage
* STORM-1409: StormClientErrorHandler is not used
* STORM-1411: Some fixes for storm-windowing
* STORM-1399: Blobstore tests should write data to `target` so it gets removed when running `mvn clean`
[5/8] storm git commit: removed unused variable
Posted by bo...@apache.org.
removed unused variable
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5829c2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5829c2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5829c2a
Branch: refs/heads/master
Commit: f5829c2a0344c310bb3703b565b433cadf7c15ea
Parents: b04eb6d
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:22:24 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:22:24 2015 -0600
----------------------------------------------------------------------
.../src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f5829c2a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 6f3e441..803b042 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,7 +125,6 @@ public class BlobStoreAPIWordCountTopology {
}
public static class FilterWords extends BaseBasicBolt {
- String key = "key";
String fileName = "blacklist.txt";
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
[6/8] storm git commit: added polling to read from disk every 5 secs
Posted by bo...@apache.org.
added polling to read from disk every 5 secs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51ae9135
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51ae9135
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51ae9135
Branch: refs/heads/master
Commit: 51ae9135779b25b1014a80021302758c73432cfb
Parents: f5829c2
Author: Sanket <sc...@untilservice-lm>
Authored: Wed Dec 16 12:49:00 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Wed Dec 16 12:49:00 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 32 +++++++++++++++-----
1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/51ae9135/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 803b042..250c418 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,18 +125,31 @@ public class BlobStoreAPIWordCountTopology {
}
public static class FilterWords extends BaseBasicBolt {
- String fileName = "blacklist.txt";
+ boolean poll = false;
+ long pollTime;
+ Set<String> wordSet;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String word = tuple.getString(0);
+ // Thread Polling every 5 seconds to update the wordSet seconds which is
+ // used in FilterWords bolt to filter the words
try {
- String word = tuple.getString(0);
- Set<String> wordSet = parseFile(fileName);
- if (!wordSet.contains(word)) {
- collector.emit(new Values(word));
+ if (!poll) {
+ wordSet = parseFile(fileName);
+ pollTime = System.currentTimeMillis();
+ poll = true;
+ } else {
+ if ((System.currentTimeMillis() - pollTime) > 5000) {
+ wordSet = parseFile(fileName);
+ pollTime = System.currentTimeMillis();
+ }
}
} catch (IOException exp) {
throw new RuntimeException(exp);
}
+ if (wordSet !=null && !wordSet.contains(word)) {
+ collector.emit(new Values(word));
+ }
}
@Override
@@ -213,14 +226,16 @@ public class BlobStoreAPIWordCountTopology {
while (tokens.hasMoreElements()) {
wordSet.add(tokens.nextToken());
}
- LOG.info("parseFile {}", wordSet);
+ LOG.debug("parseFile {}", wordSet);
return wordSet;
}
private static StringBuilder readFile(File file) throws IOException {
String line;
StringBuilder fileContent = new StringBuilder();
- // Do not use canonical file name here
+ // Do not use canonical file name here as we are using
+ // symbolic links to read file data and performing atomic move
+ // while updating files
BufferedReader br = new BufferedReader(new FileReader(file));
while ((line = br.readLine()) != null) {
fileContent.append(line);
@@ -237,7 +252,6 @@ public class BlobStoreAPIWordCountTopology {
file.createNewFile();
}
writeToFile(file, getRandomWordSet());
- LOG.info(readFile(file).toString());
return file;
}
@@ -266,12 +280,14 @@ public class BlobStoreAPIWordCountTopology {
File file = createFile(fileName);
// Creating blob again before launching topology
createBlobWithContent(key, store, file);
+
// Blostore launch command with topology blobstore map
// Here we are giving it a local name so that we can read from the file
// bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
// storm.starter.BlobStoreAPIWordCountTopology bl -c
// topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
wc.buildAndLaunchWordCountTopology(args);
+
// Updating file few times every 5 seconds
for(int i=0; i<10; i++) {
updateBlobWithContent(key, store, updateFile(file));
[7/8] storm git commit: Merge branch 'storm-blobstore-example' of
https://github.com/redsanket/storm into STORM-1373
Posted by bo...@apache.org.
Merge branch 'storm-blobstore-example' of https://github.com/redsanket/storm into STORM-1373
STORM-1373: Blobstore API sample example usage
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb6e9234
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb6e9234
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb6e9234
Branch: refs/heads/master
Commit: cb6e9234bb22523ef8023303a93190a27cf19786
Parents: 57d7cf6 51ae913
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 4 13:28:13 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 4 13:28:13 2016 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 304 +++++++++++++++++++
.../jvm/storm/starter/WordCountTopology.java | 1 -
2 files changed, 304 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[3/8] storm git commit: made modifications to read content from file
Posted by bo...@apache.org.
made modifications to read content from file
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a627953
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a627953
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a627953
Branch: refs/heads/master
Commit: 3a627953b0c2221c049d7ab0a8c6bd0b3dc1b430
Parents: 2e0c5e5
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 10:59:06 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 10:59:06 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 194 ++++++++++++-------
1 file changed, 126 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3a627953/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index e0a4679..5877f52 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -51,46 +51,46 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
+import java.util.StringTokenizer;
public class BlobStoreAPIWordCountTopology {
- private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
- private static String key = "key1";
+ private static ClientBlobStore store; // Client API to invoke blob store API functionality
+ private static String key = "key";
+ private static String fileName = "blacklist.txt";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
- private static final List<AccessControl> WORLD_EVERYTHING = Arrays.asList(new AccessControl(AccessControlType.OTHER,
- BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN));
+
+ public static void prepare() {
+ Config conf = new Config();
+ conf.putAll(Utils.readStormConfig());
+ store = Utils.getClientBlobStore(conf);
+ }
// Spout implementation
- public static class BlobStoreSpout extends BaseRichSpout {
+ public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
- BlobStoreAPIWordCountTopology wc;
- String key;
- NimbusBlobStore store;
-
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
- wc = new BlobStoreAPIWordCountTopology();
- key = "key1";
- store = new NimbusBlobStore();
- store.prepare(Utils.readStormConfig());
}
@Override
public void nextTuple() {
Utils.sleep(100);
- try {
- _collector.emit(new Values(wc.getBlobContent(key, store)));
- } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
- throw new RuntimeException(exp);
- }
+ _collector.emit(new Values(getRandomSentence()));
}
@Override
@@ -126,50 +126,45 @@ public class BlobStoreAPIWordCountTopology {
}
}
- public static class WordCount extends BaseBasicBolt {
- Map<String, Integer> counts = new HashMap<String, Integer>();
-
+ public static class FilterWords extends BaseBasicBolt {
+ String key = "key";
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
- String word = tuple.getString(0);
- Integer count = counts.get(word);
- if (count == null)
- count = 0;
- count++;
- counts.put(word, count);
- collector.emit(new Values(word, count));
+ try {
+ ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
+ String word = tuple.getString(0);
+ Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+ if (!wordSet.contains(word)) {
+ collector.emit(new Values(word));
+ }
+ } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+ throw new RuntimeException(exp);
+ }
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
+ declarer.declare(new Fields("word"));
}
}
public void buildAndLaunchWordCountTopology(String[] args) {
-
TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("spout", new BlobStoreSpout(), 5);
-
+ builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
- builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+ builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
Config conf = new Config();
conf.setDebug(true);
-
try {
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
-
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
-
Thread.sleep(10000);
-
cluster.shutdown();
}
} catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
@@ -177,24 +172,42 @@ public class BlobStoreAPIWordCountTopology {
}
}
- private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, SettableBlobMeta settableBlobMeta)
+ // Equivalent create command on command line
+ // storm blobstore create --file blacklist.txt --acl o::rwa key
+ private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
+ // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
+ String stringBlobACL = "o::rwa";
+ AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
+ List<AccessControl> acls = new LinkedList<AccessControl>();
+ acls.add(blobACL); // more ACLs can be added here
+ SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
- blobStream.write(getRandomSentence().getBytes());
+ blobStream.write(readFile(file).toString().getBytes());
blobStream.close();
}
- private static String getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
+ // Equivalent read command on command line
+ // storm blobstore cat --file blacklist.txt key
+ private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
throws AuthorizationException, KeyNotFoundException, IOException {
InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
- BufferedReader r = new BufferedReader(new InputStreamReader(blobInputStream));
- return r.readLine();
+ BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
+ StringBuilder blobContent = new StringBuilder();
+ String line;
+ while ((line = br.readLine()) != null) {
+ blobContent.append(line);
+ blobContent.append("\n");
+ }
+ return blobContent;
}
- private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore)
+ // Equivalent update command on command line
+ // storm blobstore update --file blacklist.txt key
+ private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
throws KeyNotFoundException, AuthorizationException, IOException {
AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
- blobOutputStream.write(getRandomSentence().getBytes());
+ blobOutputStream.write(readFile(file).toString().getBytes());
blobOutputStream.close();
}
@@ -205,35 +218,80 @@ public class BlobStoreAPIWordCountTopology {
return sentence;
}
- private void prepareAndPerformBasicOperations()
- throws KeyAlreadyExistsException, AuthorizationException, KeyNotFoundException, IOException{
- Map conf = Utils.readStormConfig();
- store.prepare(conf);
- // create blob with content
- createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
- // read blob content
- LOG.info(getBlobContent(key, store));
- // update blob
- updateBlobWithContent(key, store);
- LOG.info(getBlobContent(key, store));
- // delete blob through API
- store.deleteBlob(key);
+ private static Set<String> getRandomWordSet() {
+ Set<String> randomWordSet = new HashSet<>();
+ Random random = new Random();
+ String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away",
+ "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" };
+ // Choosing atmost 5 words to update the blacklist file for filtering
+ for (int i=0; i<5; i++) {
+ randomWordSet.add(words[random.nextInt(words.length)]);
+ }
+ return randomWordSet;
+ }
+
+ private static Set<String> parseBlobContent(String content) {
+ StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+ Set<String> wordSet = new HashSet<>();
+ while (tokens.hasMoreElements()) {
+ wordSet.add(tokens.nextToken());
+ }
+ return wordSet;
+ }
+
+ private static StringBuilder readFile(File file) throws IOException {
+ String line;
+ StringBuilder fileContent = new StringBuilder();
+ BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+ while ((line = br.readLine()) != null) {
+ fileContent.append(line);
+ fileContent.append("\n");
+ }
+ return fileContent;
+ }
+
+ // Creating a blacklist file to read from the disk
+ public static File createFile(String fileName) throws IOException {
+ File file = null;
+ file = new File(fileName);
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+ writeToFile(file, getRandomWordSet());
+ LOG.info(readFile(file).toString());
+ return file;
+ }
+
+ // Updating a blacklist file periodically with random words
+ public static File updateFile(File file) throws IOException {
+ writeToFile(file, getRandomWordSet());
+ return file;
+ }
+
+ // Writing random words to be blacklisted
+ public static void writeToFile(File file, Set<String> content) throws IOException{
+ FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+ BufferedWriter bw = new BufferedWriter(fw);
+ Iterator<String> iter = content.iterator();
+ while(iter.hasNext()) {
+ bw.write(iter.next());
+ }
+ bw.close();
}
public static void main(String[] args) {
- // Basic blob store API calls create, read, update and delete
+ prepare();
BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
try {
- wc.prepareAndPerformBasicOperations();
+ File file = createFile(fileName);
// Creating blob again before launching topology
- createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
- LOG.info(getBlobContent(key, store));
+ createBlobWithContent(key, store, file);
// Launching word count topology with blob updates
wc.buildAndLaunchWordCountTopology(args);
- // Updating blobs few times
- for (int i = 0; i < 10; i++) {
- updateBlobWithContent(key, store);
- Utils.sleep(100);
+ // Updating file few times every 5 seconds
+ for(int i=0; i<10; i++) {
+ updateBlobWithContent(key, store, updateFile(file));
+ Utils.sleep(5000);
}
} catch (KeyAlreadyExistsException kae) {
LOG.info("Key already exists {}", kae);
[4/8] storm git commit: created an alias to read from disk on worker
Posted by bo...@apache.org.
created an alias to read from disk on worker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b04eb6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b04eb6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b04eb6d8
Branch: refs/heads/master
Commit: b04eb6d834c5337f7bc92790aa0cef54f5ac75c2
Parents: 3a62795
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:20:27 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:20:27 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 61 ++++++++------------
1 file changed, 23 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b04eb6d8/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 5877f52..6f3e441 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -19,7 +19,6 @@ package storm.starter;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
-import backtype.storm.LocalCluster;
import backtype.storm.blobstore.AtomicOutputStream;
import backtype.storm.blobstore.ClientBlobStore;
import backtype.storm.blobstore.InputStreamWithMeta;
@@ -56,7 +55,6 @@ import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -128,16 +126,16 @@ public class BlobStoreAPIWordCountTopology {
public static class FilterWords extends BaseBasicBolt {
String key = "key";
+ String fileName = "blacklist.txt";
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
- ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
String word = tuple.getString(0);
- Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+ Set<String> wordSet = parseFile(fileName);
if (!wordSet.contains(word)) {
collector.emit(new Values(word));
}
- } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+ } catch (IOException exp) {
throw new RuntimeException(exp);
}
}
@@ -157,17 +155,9 @@ public class BlobStoreAPIWordCountTopology {
Config conf = new Config();
conf.setDebug(true);
try {
- if (args != null && args.length > 0) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("word-count", conf, builder.createTopology());
- Thread.sleep(10000);
- cluster.shutdown();
- }
- } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
throw new RuntimeException(exp);
}
}
@@ -176,7 +166,6 @@ public class BlobStoreAPIWordCountTopology {
// storm blobstore create --file blacklist.txt --acl o::rwa key
private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
- // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
String stringBlobACL = "o::rwa";
AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
List<AccessControl> acls = new LinkedList<AccessControl>();
@@ -187,21 +176,6 @@ public class BlobStoreAPIWordCountTopology {
blobStream.close();
}
- // Equivalent read command on command line
- // storm blobstore cat --file blacklist.txt key
- private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
- throws AuthorizationException, KeyNotFoundException, IOException {
- InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
- BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
- StringBuilder blobContent = new StringBuilder();
- String line;
- while ((line = br.readLine()) != null) {
- blobContent.append(line);
- blobContent.append("\n");
- }
- return blobContent;
- }
-
// Equivalent update command on command line
// storm blobstore update --file blacklist.txt key
private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
@@ -230,22 +204,28 @@ public class BlobStoreAPIWordCountTopology {
return randomWordSet;
}
- private static Set<String> parseBlobContent(String content) {
- StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+ private static Set<String> parseFile(String fileName) throws IOException {
+ File file = new File(fileName);
Set<String> wordSet = new HashSet<>();
+ if (!file.exists()) {
+ return wordSet;
+ }
+ StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
while (tokens.hasMoreElements()) {
wordSet.add(tokens.nextToken());
}
+ LOG.info("parseFile {}", wordSet);
return wordSet;
}
private static StringBuilder readFile(File file) throws IOException {
String line;
StringBuilder fileContent = new StringBuilder();
- BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+ // Do not use canonical file name here
+ BufferedReader br = new BufferedReader(new FileReader(file));
while ((line = br.readLine()) != null) {
fileContent.append(line);
- fileContent.append("\n");
+ fileContent.append(System.lineSeparator());
}
return fileContent;
}
@@ -270,11 +250,12 @@ public class BlobStoreAPIWordCountTopology {
// Writing random words to be blacklisted
public static void writeToFile(File file, Set<String> content) throws IOException{
- FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+ FileWriter fw = new FileWriter(file, false);
BufferedWriter bw = new BufferedWriter(fw);
Iterator<String> iter = content.iterator();
while(iter.hasNext()) {
bw.write(iter.next());
+ bw.write(System.lineSeparator());
}
bw.close();
}
@@ -286,7 +267,11 @@ public class BlobStoreAPIWordCountTopology {
File file = createFile(fileName);
// Creating blob again before launching topology
createBlobWithContent(key, store, file);
- // Launching word count topology with blob updates
+ // Blostore launch command with topology blobstore map
+ // Here we are giving it a local name so that we can read from the file
+ // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
+ // storm.starter.BlobStoreAPIWordCountTopology bl -c
+ // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
wc.buildAndLaunchWordCountTopology(args);
// Updating file few times every 5 seconds
for(int i=0; i<10; i++) {
[2/8] storm git commit: refactored code
Posted by bo...@apache.org.
refactored code
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0c5e55
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0c5e55
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0c5e55
Branch: refs/heads/master
Commit: 2e0c5e558f30d76dddcf97f1f46fbfa8e400f071
Parents: 7c972e4
Author: Sanket <sc...@untilservice-lm>
Authored: Sat Dec 12 10:50:16 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Sat Dec 12 10:50:16 2015 -0600
----------------------------------------------------------------------
.../starter/BlobStoreAPIWordCountTopology.java | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2e0c5e55/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index dda038d..e0a4679 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -42,6 +42,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.blobstore.BlobStoreAclHandler;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
@@ -62,14 +63,11 @@ public class BlobStoreAPIWordCountTopology {
private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
private static String key = "key1";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
- private static final int READ = 0x01;
- private static final int WRITE = 0x02;
- private static final int ADMIN = 0x04;
- private static final List<AccessControl> WORLD_EVERYTHING =
- Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+ private static final List<AccessControl> WORLD_EVERYTHING = Arrays.asList(new AccessControl(AccessControlType.OTHER,
+ BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN));
// Spout implementation
- public static class RandomSentenceSpout extends BaseRichSpout {
+ public static class BlobStoreSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
BlobStoreAPIWordCountTopology wc;
String key;
@@ -105,7 +103,7 @@ public class BlobStoreAPIWordCountTopology {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
+ declarer.declare(new Fields("sentence"));
}
}
@@ -152,7 +150,7 @@ public class BlobStoreAPIWordCountTopology {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", new RandomSentenceSpout(), 5);
+ builder.setSpout("spout", new BlobStoreSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
@@ -238,7 +236,7 @@ public class BlobStoreAPIWordCountTopology {
Utils.sleep(100);
}
} catch (KeyAlreadyExistsException kae) {
- // Do nothing
+ LOG.info("Key already exists {}", kae);
} catch (AuthorizationException | KeyNotFoundException | IOException exp) {
throw new RuntimeException(exp);
}