You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/04 20:49:41 UTC

[1/8] storm git commit: blobstore api example

Repository: storm
Updated Branches:
  refs/heads/master 57d7cf6e5 -> 59a960552


blobstore api example


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7c972e46
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7c972e46
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7c972e46

Branch: refs/heads/master
Commit: 7c972e460fc171dcfeb045957cd7e56af6d198bc
Parents: 19b8b7d
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 12:01:28 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 12:01:28 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 248 +++++++++++++++++++
 .../jvm/storm/starter/WordCountTopology.java    |   1 -
 2 files changed, 248 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7c972e46/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
new file mode 100644
index 0000000..dda038d
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.LocalCluster;
+import backtype.storm.blobstore.AtomicOutputStream;
+import backtype.storm.blobstore.ClientBlobStore;
+import backtype.storm.blobstore.InputStreamWithMeta;
+import backtype.storm.blobstore.NimbusBlobStore;
+
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.ShellBolt;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseBasicBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class BlobStoreAPIWordCountTopology {
+    private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
+    private static String key = "key1";
+    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
+    private static final int READ = 0x01;
+    private static final int WRITE = 0x02;
+    private static final int ADMIN = 0x04;
+    private static final List<AccessControl> WORLD_EVERYTHING =
+            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+
+    // Spout implementation
+    public static class RandomSentenceSpout extends BaseRichSpout {
+        SpoutOutputCollector _collector;
+        BlobStoreAPIWordCountTopology wc;
+        String key;
+        NimbusBlobStore store;
+
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            _collector = collector;
+            wc = new BlobStoreAPIWordCountTopology();
+            key = "key1";
+            store = new NimbusBlobStore();
+            store.prepare(Utils.readStormConfig());
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(100);
+            try {
+                 _collector.emit(new Values(wc.getBlobContent(key, store)));
+            } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+                throw new RuntimeException(exp);
+            }
+        }
+
+        @Override
+        public void ack(Object id) {
+        }
+
+        @Override
+        public void fail(Object id) {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+    }
+
+    // Bolt implementation
+    public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+        public SplitSentence() {
+            super("python", "splitsentence.py");
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+
+        @Override
+        public Map<String, Object> getComponentConfiguration() {
+            return null;
+        }
+    }
+
+    public static class WordCount extends BaseBasicBolt {
+        Map<String, Integer> counts = new HashMap<String, Integer>();
+
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            Integer count = counts.get(word);
+            if (count == null)
+                count = 0;
+            count++;
+            counts.put(word, count);
+            collector.emit(new Values(word, count));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word", "count"));
+        }
+    }
+
+    public void buildAndLaunchWordCountTopology(String[] args) {
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+        Config conf = new Config();
+        conf.setDebug(true);
+
+        try {
+            if (args != null && args.length > 0) {
+                conf.setNumWorkers(3);
+                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+            } else {
+                conf.setMaxTaskParallelism(3);
+
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology("word-count", conf, builder.createTopology());
+
+                Thread.sleep(10000);
+
+                cluster.shutdown();
+            }
+        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+
+    private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, SettableBlobMeta settableBlobMeta)
+            throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
+        AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
+        blobStream.write(getRandomSentence().getBytes());
+        blobStream.close();
+    }
+
+    private static String getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
+            throws AuthorizationException, KeyNotFoundException, IOException {
+        InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
+        BufferedReader r = new BufferedReader(new InputStreamReader(blobInputStream));
+        return r.readLine();
+    }
+
+    private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore)
+            throws KeyNotFoundException, AuthorizationException, IOException {
+        AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
+        blobOutputStream.write(getRandomSentence().getBytes());
+        blobOutputStream.close();
+    }
+
+    private static String getRandomSentence() {
+        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
+                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
+        String sentence = sentences[new Random().nextInt(sentences.length)];
+        return sentence;
+    }
+
+    private void prepareAndPerformBasicOperations()
+            throws KeyAlreadyExistsException, AuthorizationException, KeyNotFoundException, IOException{
+        Map conf = Utils.readStormConfig();
+        store.prepare(conf);
+        // create blob with content
+        createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
+        // read blob content
+        LOG.info(getBlobContent(key, store));
+        // update blob
+        updateBlobWithContent(key, store);
+        LOG.info(getBlobContent(key, store));
+        // delete blob through API
+        store.deleteBlob(key);
+    }
+
+    public static void main(String[] args) {
+        // Basic blob store API calls create, read, update and delete
+        BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
+        try {
+            wc.prepareAndPerformBasicOperations();
+            // Creating blob again before launching topology
+            createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
+            LOG.info(getBlobContent(key, store));
+            // Launching word count topology with blob updates
+            wc.buildAndLaunchWordCountTopology(args);
+            // Updating blobs few times
+            for (int i = 0; i < 10; i++) {
+                updateBlobWithContent(key, store);
+                Utils.sleep(100);
+            }
+        } catch (KeyAlreadyExistsException kae) {
+            // Do nothing
+        } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+            throw new RuntimeException(exp);
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/7c972e46/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 39184da..7260beb 100644
--- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@ -87,7 +87,6 @@ public class WordCountTopology {
     Config conf = new Config();
     conf.setDebug(true);
 
-
     if (args != null && args.length > 0) {
       conf.setNumWorkers(3);
 


[8/8] storm git commit: Added STORM-1373 to Changelog

Posted by bo...@apache.org.
Added STORM-1373 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/59a96055
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/59a96055
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/59a96055

Branch: refs/heads/master
Commit: 59a9605520de54930fd9ba008b6594f98f2eb63c
Parents: cb6e923
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 4 13:28:44 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 4 13:28:44 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/59a96055/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a5ef7c2..bb3717b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1373: Blobstore API sample example usage
  * STORM-1409: StormClientErrorHandler is not used
  * STORM-1411: Some fixes for storm-windowing
  * STORM-1399: Blobstore tests should write data to `target` so it gets removed when running `mvn clean`


[5/8] storm git commit: removed unused variable

Posted by bo...@apache.org.
removed unused variable


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5829c2a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5829c2a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5829c2a

Branch: refs/heads/master
Commit: f5829c2a0344c310bb3703b565b433cadf7c15ea
Parents: b04eb6d
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:22:24 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:22:24 2015 -0600

----------------------------------------------------------------------
 .../src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java        | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f5829c2a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 6f3e441..803b042 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,7 +125,6 @@ public class BlobStoreAPIWordCountTopology {
     }
 
     public static class FilterWords extends BaseBasicBolt {
-        String key = "key";
         String fileName = "blacklist.txt";
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {


[6/8] storm git commit: added polling to read from disk every 5 secs

Posted by bo...@apache.org.
added polling to read from disk every 5 secs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/51ae9135
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/51ae9135
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/51ae9135

Branch: refs/heads/master
Commit: 51ae9135779b25b1014a80021302758c73432cfb
Parents: f5829c2
Author: Sanket <sc...@untilservice-lm>
Authored: Wed Dec 16 12:49:00 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Wed Dec 16 12:49:00 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/51ae9135/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 803b042..250c418 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -125,18 +125,31 @@ public class BlobStoreAPIWordCountTopology {
     }
 
     public static class FilterWords extends BaseBasicBolt {
-        String fileName = "blacklist.txt";
+        boolean poll = false;
+        long pollTime;
+        Set<String> wordSet;
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String word = tuple.getString(0);
+            // Thread Polling every 5 seconds to update the wordSet seconds which is
+            // used in FilterWords bolt to filter the words
             try {
-                String word = tuple.getString(0);
-                Set<String> wordSet = parseFile(fileName);
-                if (!wordSet.contains(word)) {
-                    collector.emit(new Values(word));
+                if (!poll) {
+                    wordSet = parseFile(fileName);
+                    pollTime = System.currentTimeMillis();
+                    poll = true;
+                } else {
+                    if ((System.currentTimeMillis() - pollTime) > 5000) {
+                        wordSet = parseFile(fileName);
+                        pollTime = System.currentTimeMillis();
+                    }
                 }
             } catch (IOException exp) {
                 throw new RuntimeException(exp);
             }
+            if (wordSet !=null && !wordSet.contains(word)) {
+                collector.emit(new Values(word));
+            }
         }
 
         @Override
@@ -213,14 +226,16 @@ public class BlobStoreAPIWordCountTopology {
         while (tokens.hasMoreElements()) {
             wordSet.add(tokens.nextToken());
         }
-        LOG.info("parseFile {}", wordSet);
+        LOG.debug("parseFile {}", wordSet);
         return wordSet;
     }
 
     private static StringBuilder readFile(File file) throws IOException {
         String line;
         StringBuilder fileContent = new StringBuilder();
-        // Do not use canonical file name here
+        // Do not use canonical file name here as we are using
+        // symbolic links to read file data and performing atomic move
+        // while updating files
         BufferedReader br = new BufferedReader(new FileReader(file));
         while ((line = br.readLine()) != null) {
             fileContent.append(line);
@@ -237,7 +252,6 @@ public class BlobStoreAPIWordCountTopology {
             file.createNewFile();
         }
         writeToFile(file, getRandomWordSet());
-        LOG.info(readFile(file).toString());
         return file;
     }
 
@@ -266,12 +280,14 @@ public class BlobStoreAPIWordCountTopology {
             File file = createFile(fileName);
             // Creating blob again before launching topology
             createBlobWithContent(key, store, file);
+
             // Blostore launch command with topology blobstore map
             // Here we are giving it a local name so that we can read from the file
             // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
             // storm.starter.BlobStoreAPIWordCountTopology bl -c
             // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
             wc.buildAndLaunchWordCountTopology(args);
+
             // Updating file few times every 5 seconds
             for(int i=0; i<10; i++) {
                 updateBlobWithContent(key, store, updateFile(file));


[7/8] storm git commit: Merge branch 'storm-blobstore-example' of https://github.com/redsanket/storm into STORM-1373

Posted by bo...@apache.org.
Merge branch 'storm-blobstore-example' of https://github.com/redsanket/storm into STORM-1373

STORM-1373: Blobstore API sample example usage


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb6e9234
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb6e9234
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb6e9234

Branch: refs/heads/master
Commit: cb6e9234bb22523ef8023303a93190a27cf19786
Parents: 57d7cf6 51ae913
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Jan 4 13:28:13 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Jan 4 13:28:13 2016 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 304 +++++++++++++++++++
 .../jvm/storm/starter/WordCountTopology.java    |   1 -
 2 files changed, 304 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[3/8] storm git commit: made modifications to read content from file

Posted by bo...@apache.org.
made modifications to read content from file


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3a627953
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3a627953
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3a627953

Branch: refs/heads/master
Commit: 3a627953b0c2221c049d7ab0a8c6bd0b3dc1b430
Parents: 2e0c5e5
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 10:59:06 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 10:59:06 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 194 ++++++++++++-------
 1 file changed, 126 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3a627953/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index e0a4679..5877f52 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -51,46 +51,46 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 public class BlobStoreAPIWordCountTopology {
-    private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
-    private static String key = "key1";
+    private static ClientBlobStore store; // Client API to invoke blob store API functionality
+    private static String key = "key";
+    private static String fileName = "blacklist.txt";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
-    private static final List<AccessControl> WORLD_EVERYTHING = Arrays.asList(new AccessControl(AccessControlType.OTHER,
-            BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN));
+
+    public static void prepare() {
+        Config conf = new Config();
+        conf.putAll(Utils.readStormConfig());
+        store = Utils.getClientBlobStore(conf);
+    }
 
     // Spout implementation
-    public static class BlobStoreSpout extends BaseRichSpout {
+    public static class RandomSentenceSpout extends BaseRichSpout {
         SpoutOutputCollector _collector;
-        BlobStoreAPIWordCountTopology wc;
-        String key;
-        NimbusBlobStore store;
-
 
         @Override
         public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
             _collector = collector;
-            wc = new BlobStoreAPIWordCountTopology();
-            key = "key1";
-            store = new NimbusBlobStore();
-            store.prepare(Utils.readStormConfig());
         }
 
         @Override
         public void nextTuple() {
             Utils.sleep(100);
-            try {
-                 _collector.emit(new Values(wc.getBlobContent(key, store)));
-            } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
-                throw new RuntimeException(exp);
-            }
+            _collector.emit(new Values(getRandomSentence()));
         }
 
         @Override
@@ -126,50 +126,45 @@ public class BlobStoreAPIWordCountTopology {
         }
     }
 
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
+    public static class FilterWords extends BaseBasicBolt {
+        String key = "key";
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if (count == null)
-                count = 0;
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
+            try {
+                ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
+                String word = tuple.getString(0);
+                Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+                if (!wordSet.contains(word)) {
+                    collector.emit(new Values(word));
+                }
+            } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+                throw new RuntimeException(exp);
+            }
         }
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
+            declarer.declare(new Fields("word"));
         }
     }
 
     public void buildAndLaunchWordCountTopology(String[] args) {
-
         TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("spout", new BlobStoreSpout(), 5);
-
+        builder.setSpout("spout", new RandomSentenceSpout(), 5);
         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+        builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split");
 
         Config conf = new Config();
         conf.setDebug(true);
-
         try {
             if (args != null && args.length > 0) {
                 conf.setNumWorkers(3);
                 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
             } else {
                 conf.setMaxTaskParallelism(3);
-
                 LocalCluster cluster = new LocalCluster();
                 cluster.submitTopology("word-count", conf, builder.createTopology());
-
                 Thread.sleep(10000);
-
                 cluster.shutdown();
             }
         } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
@@ -177,24 +172,42 @@ public class BlobStoreAPIWordCountTopology {
         }
     }
 
-    private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, SettableBlobMeta settableBlobMeta)
+    // Equivalent create command on command line
+    // storm blobstore create --file blacklist.txt --acl o::rwa key
+    private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
             throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
+        // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
+        String stringBlobACL = "o::rwa";
+        AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
+        List<AccessControl> acls = new LinkedList<AccessControl>();
+        acls.add(blobACL); // more ACLs can be added here
+        SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
         AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey,settableBlobMeta);
-        blobStream.write(getRandomSentence().getBytes());
+        blobStream.write(readFile(file).toString().getBytes());
         blobStream.close();
     }
 
-    private static String getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
+    // Equivalent read command on command line
+    // storm blobstore cat --file blacklist.txt key
+    private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
             throws AuthorizationException, KeyNotFoundException, IOException {
         InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
-        BufferedReader r = new BufferedReader(new InputStreamReader(blobInputStream));
-        return r.readLine();
+        BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
+        StringBuilder blobContent = new StringBuilder();
+        String line;
+        while ((line = br.readLine()) != null) {
+            blobContent.append(line);
+            blobContent.append("\n");
+        }
+        return blobContent;
     }
 
-    private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore)
+    // Equivalent update command on command line
+    // storm blobstore update --file blacklist.txt key
+    private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
             throws KeyNotFoundException, AuthorizationException, IOException {
         AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey);
-        blobOutputStream.write(getRandomSentence().getBytes());
+        blobOutputStream.write(readFile(file).toString().getBytes());
         blobOutputStream.close();
     }
 
@@ -205,35 +218,80 @@ public class BlobStoreAPIWordCountTopology {
         return sentence;
     }
 
-    private void prepareAndPerformBasicOperations()
-            throws KeyAlreadyExistsException, AuthorizationException, KeyNotFoundException, IOException{
-        Map conf = Utils.readStormConfig();
-        store.prepare(conf);
-        // create blob with content
-        createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
-        // read blob content
-        LOG.info(getBlobContent(key, store));
-        // update blob
-        updateBlobWithContent(key, store);
-        LOG.info(getBlobContent(key, store));
-        // delete blob through API
-        store.deleteBlob(key);
+    private static Set<String> getRandomWordSet() {
+        Set<String> randomWordSet = new HashSet<>();
+        Random random = new Random();
+        String[] words = new String[]{ "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away",
+                "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" };
+        // Choosing atmost 5 words to update the blacklist file for filtering
+        for (int i=0; i<5; i++) {
+            randomWordSet.add(words[random.nextInt(words.length)]);
+        }
+        return randomWordSet;
+    }
+
+    private static Set<String> parseBlobContent(String content) {
+        StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+        Set<String> wordSet = new HashSet<>();
+        while (tokens.hasMoreElements()) {
+            wordSet.add(tokens.nextToken());
+        }
+        return wordSet;
+    }
+
+    private static StringBuilder readFile(File file) throws IOException {
+        String line;
+        StringBuilder fileContent = new StringBuilder();
+        BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+        while ((line = br.readLine()) != null) {
+            fileContent.append(line);
+            fileContent.append("\n");
+        }
+        return fileContent;
+    }
+
+    // Creating a blacklist file to read from the disk
+    public static File createFile(String fileName) throws IOException {
+        File file = null;
+        file = new File(fileName);
+        if (!file.exists()) {
+            file.createNewFile();
+        }
+        writeToFile(file, getRandomWordSet());
+        LOG.info(readFile(file).toString());
+        return file;
+    }
+
+    // Updating a blacklist file periodically with random words
+    public static File updateFile(File file) throws IOException {
+        writeToFile(file, getRandomWordSet());
+        return file;
+    }
+
+    // Writing random words to be blacklisted
+    public static void writeToFile(File file, Set<String> content) throws IOException{
+        FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+        BufferedWriter bw = new BufferedWriter(fw);
+        Iterator<String> iter = content.iterator();
+        while(iter.hasNext()) {
+            bw.write(iter.next());
+        }
+        bw.close();
     }
 
     public static void main(String[] args) {
-        // Basic blob store API calls create, read, update and delete
+        prepare();
         BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology();
         try {
-            wc.prepareAndPerformBasicOperations();
+            File file = createFile(fileName);
             // Creating blob again before launching topology
-            createBlobWithContent(key, store, new SettableBlobMeta(WORLD_EVERYTHING));
-            LOG.info(getBlobContent(key, store));
+            createBlobWithContent(key, store, file);
             // Launching word count topology with blob updates
             wc.buildAndLaunchWordCountTopology(args);
-            // Updating blobs few times
-            for (int i = 0; i < 10; i++) {
-                updateBlobWithContent(key, store);
-                Utils.sleep(100);
+            // Updating file few times every 5 seconds
+            for(int i=0; i<10; i++) {
+                updateBlobWithContent(key, store, updateFile(file));
+                Utils.sleep(5000);
             }
         } catch (KeyAlreadyExistsException kae) {
             LOG.info("Key already exists {}", kae);


[4/8] storm git commit: created an alias to read from disk on worker

Posted by bo...@apache.org.
created an alias to read from disk on worker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b04eb6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b04eb6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b04eb6d8

Branch: refs/heads/master
Commit: b04eb6d834c5337f7bc92790aa0cef54f5ac75c2
Parents: 3a62795
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 15 21:20:27 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 15 21:20:27 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java  | 61 ++++++++------------
 1 file changed, 23 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b04eb6d8/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index 5877f52..6f3e441 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -19,7 +19,6 @@ package storm.starter;
 
 import backtype.storm.Config;
 import backtype.storm.StormSubmitter;
-import backtype.storm.LocalCluster;
 import backtype.storm.blobstore.AtomicOutputStream;
 import backtype.storm.blobstore.ClientBlobStore;
 import backtype.storm.blobstore.InputStreamWithMeta;
@@ -56,7 +55,6 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -128,16 +126,16 @@ public class BlobStoreAPIWordCountTopology {
 
     public static class FilterWords extends BaseBasicBolt {
         String key = "key";
+        String fileName = "blacklist.txt";
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             try {
-                ClientBlobStore store = Utils.getClientBlobStore(Utils.readStormConfig());
                 String word = tuple.getString(0);
-                Set<String> wordSet = parseBlobContent(getBlobContent(key, store).toString());
+                Set<String> wordSet = parseFile(fileName);
                 if (!wordSet.contains(word)) {
                     collector.emit(new Values(word));
                 }
-            } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
+            } catch (IOException exp) {
                 throw new RuntimeException(exp);
             }
         }
@@ -157,17 +155,9 @@ public class BlobStoreAPIWordCountTopology {
         Config conf = new Config();
         conf.setDebug(true);
         try {
-            if (args != null && args.length > 0) {
-                conf.setNumWorkers(3);
-                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-            } else {
-                conf.setMaxTaskParallelism(3);
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology("word-count", conf, builder.createTopology());
-                Thread.sleep(10000);
-                cluster.shutdown();
-            }
-        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException | InterruptedException exp) {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+        } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) {
             throw new RuntimeException(exp);
         }
     }
@@ -176,7 +166,6 @@ public class BlobStoreAPIWordCountTopology {
     // storm blobstore create --file blacklist.txt --acl o::rwa key
     private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
             throws AuthorizationException, KeyAlreadyExistsException, IOException,KeyNotFoundException {
-        // For more on how to set acls while creating the blob please look at distcache-blobstore.md documentation
         String stringBlobACL = "o::rwa";
         AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
         List<AccessControl> acls = new LinkedList<AccessControl>();
@@ -187,21 +176,6 @@ public class BlobStoreAPIWordCountTopology {
         blobStream.close();
     }
 
-    // Equivalent read command on command line
-    // storm blobstore cat --file blacklist.txt key
-    private static StringBuilder getBlobContent(String blobKey, ClientBlobStore clientBlobStore)
-            throws AuthorizationException, KeyNotFoundException, IOException {
-        InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey);
-        BufferedReader br = new BufferedReader(new InputStreamReader(blobInputStream));
-        StringBuilder blobContent = new StringBuilder();
-        String line;
-        while ((line = br.readLine()) != null) {
-            blobContent.append(line);
-            blobContent.append("\n");
-        }
-        return blobContent;
-    }
-
     // Equivalent update command on command line
     // storm blobstore update --file blacklist.txt key
     private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
@@ -230,22 +204,28 @@ public class BlobStoreAPIWordCountTopology {
         return randomWordSet;
     }
 
-    private static Set<String> parseBlobContent(String content) {
-        StringTokenizer tokens = new StringTokenizer(content, "\r\n");
+    private static Set<String> parseFile(String fileName) throws IOException {
+        File file = new File(fileName);
         Set<String> wordSet = new HashSet<>();
+        if (!file.exists()) {
+            return wordSet;
+        }
+        StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n");
         while (tokens.hasMoreElements()) {
             wordSet.add(tokens.nextToken());
         }
+        LOG.info("parseFile {}", wordSet);
         return wordSet;
     }
 
     private static StringBuilder readFile(File file) throws IOException {
         String line;
         StringBuilder fileContent = new StringBuilder();
-        BufferedReader br = new BufferedReader(new FileReader(file.getCanonicalFile()));
+        // Do not use canonical file name here
+        BufferedReader br = new BufferedReader(new FileReader(file));
         while ((line = br.readLine()) != null) {
             fileContent.append(line);
-            fileContent.append("\n");
+            fileContent.append(System.lineSeparator());
         }
         return fileContent;
     }
@@ -270,11 +250,12 @@ public class BlobStoreAPIWordCountTopology {
 
     // Writing random words to be blacklisted
     public static void writeToFile(File file, Set<String> content) throws IOException{
-        FileWriter fw = new FileWriter(file.getCanonicalPath(), false);
+        FileWriter fw = new FileWriter(file, false);
         BufferedWriter bw = new BufferedWriter(fw);
         Iterator<String> iter = content.iterator();
         while(iter.hasNext()) {
             bw.write(iter.next());
+            bw.write(System.lineSeparator());
         }
         bw.close();
     }
@@ -286,7 +267,11 @@ public class BlobStoreAPIWordCountTopology {
             File file = createFile(fileName);
             // Creating blob again before launching topology
             createBlobWithContent(key, store, file);
-            // Launching word count topology with blob updates
+            // Blostore launch command with topology blobstore map
+            // Here we are giving it a local name so that we can read from the file
+            // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar
+            // storm.starter.BlobStoreAPIWordCountTopology bl -c
+            // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}'
             wc.buildAndLaunchWordCountTopology(args);
             // Updating file few times every 5 seconds
             for(int i=0; i<10; i++) {


[2/8] storm git commit: refactored code

Posted by bo...@apache.org.
refactored code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0c5e55
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0c5e55
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0c5e55

Branch: refs/heads/master
Commit: 2e0c5e558f30d76dddcf97f1f46fbfa8e400f071
Parents: 7c972e4
Author: Sanket <sc...@untilservice-lm>
Authored: Sat Dec 12 10:50:16 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Sat Dec 12 10:50:16 2015 -0600

----------------------------------------------------------------------
 .../starter/BlobStoreAPIWordCountTopology.java      | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2e0c5e55/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
index dda038d..e0a4679 100644
--- a/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/BlobStoreAPIWordCountTopology.java
@@ -42,6 +42,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.blobstore.BlobStoreAclHandler;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
@@ -62,14 +63,11 @@ public class BlobStoreAPIWordCountTopology {
     private static NimbusBlobStore store = new NimbusBlobStore(); // Client API to invoke blob store API functionality
     private static String key = "key1";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
-    private static final int READ = 0x01;
-    private static final int WRITE = 0x02;
-    private static final int ADMIN = 0x04;
-    private static final List<AccessControl> WORLD_EVERYTHING =
-            Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+    private static final List<AccessControl> WORLD_EVERYTHING = Arrays.asList(new AccessControl(AccessControlType.OTHER,
+            BlobStoreAclHandler.READ | BlobStoreAclHandler.WRITE | BlobStoreAclHandler.ADMIN));
 
     // Spout implementation
-    public static class RandomSentenceSpout extends BaseRichSpout {
+    public static class BlobStoreSpout extends BaseRichSpout {
         SpoutOutputCollector _collector;
         BlobStoreAPIWordCountTopology wc;
         String key;
@@ -105,7 +103,7 @@ public class BlobStoreAPIWordCountTopology {
 
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
+            declarer.declare(new Fields("sentence"));
         }
 
     }
@@ -152,7 +150,7 @@ public class BlobStoreAPIWordCountTopology {
 
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.setSpout("spout", new RandomSentenceSpout(), 5);
+        builder.setSpout("spout", new BlobStoreSpout(), 5);
 
         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
@@ -238,7 +236,7 @@ public class BlobStoreAPIWordCountTopology {
                 Utils.sleep(100);
             }
         } catch (KeyAlreadyExistsException kae) {
-            // Do nothing
+            LOG.info("Key already exists {}", kae);
         } catch (AuthorizationException | KeyNotFoundException | IOException exp) {
             throw new RuntimeException(exp);
         }