You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Michail Toutoudakis <mi...@gmail.com> on 2015/06/10 17:19:24 UTC

Controllng spout and bolt Generations??

Is there a way i can control the sequence spouts and bolts are generated? I am using Hazelcast a distributed data collection and in the spout constructor i am generating some hash functions that will be used by the bolts. However it looks like bolts are initialized before spout, or before hash functions generation thus getting empy hash functions. Universal window object when initialized in constructor reads HazelCast collection which is empty. Hasn’t got any values yet.  My spout, bolt and UniversalWindow class code follows:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;
        HashFunctionsGen.generateHashFunctionsForUBW(); //HazelCast HashFunction Generation
        HashFunctionsGen.size();
        System.err.println("Generating hashFunctions");

        try {
            this.scanner = new Scanner(new File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if (!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee delivery
//                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of acked word " + noOfAckedWords);
//        System.out.println("no of acked tuples: "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords);

    }

}

package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.timeseries.SlidingWindow;
import tuc.LSH.core.timeseries.UniversalBasicWindow;

import java.util.Map;
import java.util.Queue;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;
    private SlidingWindow slidingWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        streamId = null;
        time = null;
        value = 0f;
        this.universalBasicWindow = new UniversalBasicWindow();
        this.slidingWindow = new SlidingWindow();
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
//            System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
//            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size
            slidingWindow.addBasicWindow(universalBasicWindow);
//            universalBasicWindow.print();
//            System.err.println("==Bolt: "+task_id);
//            System.out.println("adding basic window to sliding window");
//            System.err.println("Size: "+slidingWindow.getwindow().size());
//            slidingWindow.print();

//            System.out.println("Resetting Window Bolt "+task_id);
            universalBasicWindow.reset();
//            Utils.sleep(1);
//            collector.ack(tuple);

        }else if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));
//            System.out.println("From Bolt: "+task_id);
            universalBasicWindow.pushStream(streamId, value);

            if (universalBasicWindow.isFull()) { //check if any stream of the window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up");

                collector.emit("bwFilled", new Values(task_id));
//                Utils.sleep(1);

            }
        }
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}

//        System.err.println("SourceComponent: "+tuple.getSourceComponent());
//        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
//        System.err.println("Source Task: "+tuple.getSourceTask());
//        System.err.println("SourceGlobalStreamId: "+tuple.getSourceGlobalStreamid());
//        System.err.println("MessageId: "+tuple.getMessageId());



package tuc.LSH.core.timeseries;

import clojure.lang.Cons;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import java.util.*;

/**
 * Created by mixtou on 19/5/15.
 */
public class UniversalBasicWindow {

    private HashMap<String, StreamBasicWindow> allStreamsSignatures;
    private IQueue<boolean[]> hashFunctions;
    private LinkedList<boolean[]> localHashFunction;

    public  UniversalBasicWindow() {
//        HashFunctionsGen.generateHashFunctionsForUBW(); //generate new hashFunctions for new UniversalBasicWindow
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        localHashFunction = new LinkedList<>();
        localHashFunction.addAll(hashFunctions);
        this.allStreamsSignatures = new HashMap<>();

        System.out.println("New Universal Basic Window");
        System.out.println("size: "+hashFunctions.size());
    }

    public void reset() {
        localHashFunction.clear();
        hashFunctions.clear();
        clearValues();
    }

    public void clearValues() {

        for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
            entry.getValue().clear();
//            System.err.println("na sou p: " +entry.getKey());
        }

    }

    public void pushStream(String streamId, float value) {

        StreamBasicWindow streamBasicWindow = allStreamsSignatures.get(streamId);

        if (streamBasicWindow == null) {
//            System.out.println("New Stream: " + streamId);
            streamBasicWindow = new StreamBasicWindow(streamId, localHashFunction);
            allStreamsSignatures.put(streamId, streamBasicWindow);
        }

        streamBasicWindow.pushValue(value);

//        System.out.println("Updated Basic Window of Stream: " + streamId+" at position "+streamBasicWindow.getCurrentPosition());

    }

    public HashMap<String, StreamBasicWindow> getAllStreamsSignatures() {
        return allStreamsSignatures;
    }


    public boolean isFull() {

        boolean reply = false;

        for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
            if (entry.getValue().getCurrentPosition() == Consts.BASIC_WINDOW_SIZE) {
//                System.out.println("Bolt id "+task_id+" StreamId: " + entry.getKey() + " size " + entry.getValue().getSize() + " curr position: " + entry.getValue().getCurrentPosition());
                reply = true;
                break; //even if one full stream is found exit loop and return true;
            }
        }
        return reply;

    }

    public void print() {

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();
//        for(Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()){
        System.out.println("====================================");
        while (iterator.hasNext()) {
//            System.out.println("Stream Id: " + iterator.next().getKey() + " size: " + iterator.next().getValue().getCurrentPosition());
            System.err.println("SIGNATURES: "+iterator.next().getValue().getStreamSignatures());
            System.err.println("SUMS: "+iterator.next().getValue().getStreamSums());
        }
        System.out.println("====================================");
    }

    public void printLocalFunctions() {
        int temp = 0;
        for (boolean[] entry : localHashFunction) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.out.println("Window Local Functions: " + temp + " entry " + i + " value: " + entry[i]);
            }
        }
    }

    public void normalize() {

//        System.out.println("Normalizing");

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();

        while (iterator.hasNext()) {
            Map.Entry<String, StreamBasicWindow> entry = iterator.next();
            if (entry.getValue().getCurrentPosition() < Consts.BASIC_WINDOW_SIZE) {
                for (int i = entry.getValue().getCurrentPosition(); i < Consts.BASIC_WINDOW_SIZE; i++) {
                    entry.getValue().pushValue(entry.getValue().getLastReceivedValue());
                }
            }
        }
    }


}

package tuc.LSH.core.hashfunctions;

import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Lock;

/**
 * Created by mixtou on 14/5/15.
 */
public class HashFunctionsGen {

    static IQueue<boolean[]> hashFunctions;
    static Random rnd;

    public static boolean cosineFamilyValue() {
        return rnd.nextBoolean();
    }

    public static void generateHashFunctionsForUBW() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        rnd = new Random();

        //do nothing if the required nu of hashfunctions has been generated
        if (!(hashFunctions.size() > 0)) {
//            System.out.println("Already generated functions DO NOTHING");

//        System.out.println("Generating New Hash Functions");
            Lock lock = hz.getLock("hashFunctions");
            lock.lock();
            try {

                for (int i = 0; i < Consts.NO_OF_HASH_FUNCTIONS; i++) {
                    boolean[] temp = new boolean[Consts.BASIC_WINDOW_SIZE];
                    for (int j = 0; j < Consts.BASIC_WINDOW_SIZE; j++) {
                        temp[j] = HashFunctionsGen.cosineFamilyValue();
                    }
                    hashFunctions.add(temp);
                }
            } finally {
                lock.unlock();
            }
        }

    }

    public static void size() {
        System.out.println("Size of queue: " + hashFunctions.size());
    }

    public static void print() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        Queue<boolean[]> functions = hz.getQueue("hashFunctions");

        int temp = 0;
        for (boolean[] entry : functions) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.err.println("Hazelcast Function " + temp + " entry " + i + " value: " + entry[i]);
            }
        }
    }

    public static void clear() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        Lock lock = hz.getLock("hashFunctions");
        lock.lock();
        try {
            hashFunctions.clear();
//                hashFunctions = hz.getQueue("hashFunctions");
        } finally {
            lock.unlock();
        }


    }


}


Re: Controllng spout and bolt Generations??

Posted by Michail Toutoudakis <mi...@gmail.com>.
I tried your suggestion, I moved hazelcast instantiation on the Bolts, however the problem still remains exactly the same. I believe that the problem is somewhere else in my code, and i can’t figure it out. 
If anyone has any other suggestions please let me know. 

> On 10 Ιουν 2015, at 23:30, Ravi Tandon <Ra...@microsoft.com> wrote:
> 
> Hi Michail
>  
> Forgive if I missed out on you scenario detail but from what you described, you don’t need to have a control over generation to make this work. There are other design alternatives albeit simpler ones. I understand that waiting in bolt prepare for those functions to be available may not be the best option.
> How about:
> The execute method on bolt will only be called if a spout has emitted, which means spout is already up and running. You could have a small init function (that does what you are doing in prepare) which checks on a flag at each execute and initializes your dependencies – a little extra cpu cycles to check a flag but guarantees the spout to be up. I also prefer such functions when I interact with third-party services as I can re-init if an exception was hit (an alternative to not crashing the bolt and losing state), rather fail the tuple and handle again (crash bolt if errors > X).
>  
> This init can do more things based on your task index or streams as you may want to block on particular hashing functions meant for that stream. Worst case, if you ever have to wait on the state only 1 tuple at start or on re-init will pay the execute cost.
>  
> Not sure how others solve this problem, would like to hear their opinions.
>  
> -Ravi
>  
> From: Michail Toutoudakis [mailto:mixtou@gmail.com] 
> Sent: Wednesday, June 10, 2015 8:19 AM
> To: user@storm.apache.org
> Subject: Controllng spout and bolt Generations??
>  
> Is there a way i can control the sequence spouts and bolts are generated? I am using Hazelcast a distributed data collection and in the spout constructor i am generating some hash functions that will be used by the bolts. However it looks like bolts are initialized before spout, or before hash functions generation thus getting empy hash functions. Universal window object when initialized in constructor reads HazelCast collection which is empty. Hasn’t got any values yet.  My spout, bolt and UniversalWindow class code follows:
>  
> package tuc.LSH.storm.spouts;
> 
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> 
> import javax.rmi.CORBA.Util;
> import java.io.*;
> import java.util.Map;
> import java.util.Scanner;
> 
> /**
>  * Created by mixtou on 15/5/15.
>  */
> public class FileReaderSpout extends BaseRichSpout {
> //public class FileReaderSpout implements IRichSpout {
> 
>     private SpoutOutputCollector collector;
>     private Scanner scanner;
>     private boolean completed;
>     private TopologyContext context;
>     private int spout_idx;
>     private int spout_id;
>     private Map config;
>     private int noOfFailedWords;
>     private int noOfAckedWords;
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value"));
> 
> 
>     }
> 
>     @Override
>     public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
>         this.context = topologyContext;
>         this.spout_idx = context.getThisTaskIndex();
>         this.spout_id = context.getThisTaskId();
>         this.collector = spoutOutputCollector;
>         this.config = config;
>         this.completed = false;
>         this.noOfFailedWords = 0;
>         this.noOfAckedWords = 0;
>         HashFunctionsGen.generateHashFunctionsForUBW(); //HazelCast HashFunction Generation
>         HashFunctionsGen.size();
>         System.err.println("Generating hashFunctions");
> 
>         try {
>             this.scanner = new Scanner(new File(config.get(file_to_read()).toString()));
>             System.err.println("Scanner Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>         } catch (FileNotFoundException e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void nextTuple() {
> 
>         if (!completed) {
>             if (scanner.hasNextLine()) {
>                 String[] temp = scanner.nextLine().split(",");
> //            System.err.println("============== " + temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>                 collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee delivery
> //                Utils.sleep(1);
>             } else {
>                 System.err.println("End of File Closing Reader");
>                 scanner.close();
>                 completed = true;
>             }
>         }
> 
>     }
> 
>     private String file_to_read() {
> //        this.spout_id = context.getThisTaskId();
>         if (Consts.NO_OF_SPOUTS > 1) {
>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>             return "data" + file_no;
>         } else {
>             return "data";
>         }
>     }
> 
>     @Override
>     public void ack(Object msgId) {
>         super.ack(msgId);
>         noOfAckedWords++;
> //        System.out.println("OK tuple acked from bolt: " + msgId + " no of acked word " + noOfAckedWords);
> //        System.out.println("no of acked tuples: "+noOfAckedWords);
>     }
> 
>     @Override
>     public void fail(Object msgId) {
>         super.fail(msgId);
>         noOfFailedWords++;
>         System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords);
> 
>     }
> 
> }
>  
> package tuc.LSH.storm.bolts;
> 
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import com.hazelcast.core.Hazelcast;
> import com.hazelcast.core.HazelcastInstance;
> import tuc.LSH.conf.HazelCastConfig;
> import tuc.LSH.core.timeseries.SlidingWindow;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
> 
> import java.util.Map;
> import java.util.Queue;
> 
> /**
>  * Created by mixtou on 17/5/15.
>  */
> public class LSHBolt extends BaseRichBolt {
>     private int task_id;
>     private OutputCollector collector;
>     private UniversalBasicWindow universalBasicWindow;
>     private SlidingWindow slidingWindow;
> 
>     private String streamId;
>     private String time;
>     private Float value;
> 
>     @Override
>     public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {
>         this.task_id = topologyContext.getThisTaskIndex();
>         this.collector = outputCollector;
>         streamId = null;
>         time = null;
>         value = 0f;
>         this.universalBasicWindow = new UniversalBasicWindow();
>         this.slidingWindow = new SlidingWindow();
>         System.err.println("New Bolt with id: " + task_id);
> 
>     }
> 
>     @Override
>     public void execute(Tuple tuple) {
> 
>         if (tuple.getSourceStreamId().equals("sync")) {
> //            System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
> //            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
>             universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size
>             slidingWindow.addBasicWindow(universalBasicWindow);
> //            universalBasicWindow.print();
> //            System.err.println("==Bolt: "+task_id);
> //            System.out.println("adding basic window to sliding window");
> //            System.err.println("Size: "+slidingWindow.getwindow().size());
> //            slidingWindow.print();
> 
> //            System.out.println("Resetting Window Bolt "+task_id);
>             universalBasicWindow.reset();
> //            Utils.sleep(1);
> //            collector.ack(tuple);
> 
>         }else if (tuple.getSourceStreamId().equals("data")) {
> 
>             streamId = tuple.getStringByField("streamId");
>             time = tuple.getStringByField("timestamp");
>             value = Float.parseFloat(tuple.getStringByField("value"));
> //            System.out.println("From Bolt: "+task_id);
>             universalBasicWindow.pushStream(streamId, value);
> 
>             if (universalBasicWindow.isFull()) { //check if any stream of the window is full
> 
> //                System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up");
> 
>                 collector.emit("bwFilled", new Values(task_id));
> //                Utils.sleep(1);
> 
>             }
>         }
>         collector.ack(tuple);
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
> 
>     }
> }
> 
> //        System.err.println("SourceComponent: "+tuple.getSourceComponent());
> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
> //        System.err.println("Source Task: "+tuple.getSourceTask());
> //        System.err.println("SourceGlobalStreamId: "+tuple.getSourceGlobalStreamid());
> //        System.err.println("MessageId: "+tuple.getMessageId());
>  
>  
>  
> package tuc.LSH.core.timeseries;
> 
> import clojure.lang.Cons;
> import com.hazelcast.core.Hazelcast;
> import com.hazelcast.core.HazelcastInstance;
> import com.hazelcast.core.IQueue;
> import tuc.LSH.conf.Consts;
> import tuc.LSH.conf.HazelCastConfig;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> 
> import java.util.*;
> 
> /**
>  * Created by mixtou on 19/5/15.
>  */
> public class UniversalBasicWindow {
> 
>     private HashMap<String, StreamBasicWindow> allStreamsSignatures;
>     private IQueue<boolean[]> hashFunctions;
>     private LinkedList<boolean[]> localHashFunction;
> 
>     public  UniversalBasicWindow() {
> //        HashFunctionsGen.generateHashFunctionsForUBW(); //generate new hashFunctions for new UniversalBasicWindow
>         HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
>         hashFunctions = hz.getQueue("hashFunctions");
>         localHashFunction = new LinkedList<>();
>         localHashFunction.addAll(hashFunctions);
>         this.allStreamsSignatures = new HashMap<>();
> 
>         System.out.println("New Universal Basic Window");
>         System.out.println("size: "+hashFunctions.size());
>     }
> 
>     public void reset() {
>         localHashFunction.clear();
>         hashFunctions.clear();
>         clearValues();
>     }
> 
>     public void clearValues() {
> 
>         for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
>             entry.getValue().clear();
> //            System.err.println("na sou p: " +entry.getKey());
>         }
> 
>     }
> 
>     public void pushStream(String streamId, float value) {
> 
>         StreamBasicWindow streamBasicWindow = allStreamsSignatures.get(streamId);
> 
>         if (streamBasicWindow == null) {
> //            System.out.println("New Stream: " + streamId);
>             streamBasicWindow = new StreamBasicWindow(streamId, localHashFunction);
>             allStreamsSignatures.put(streamId, streamBasicWindow);
>         }
> 
>         streamBasicWindow.pushValue(value);
> 
> //        System.out.println("Updated Basic Window of Stream: " + streamId+" at position "+streamBasicWindow.getCurrentPosition());
> 
>     }
> 
>     public HashMap<String, StreamBasicWindow> getAllStreamsSignatures() {
>         return allStreamsSignatures;
>     }
> 
> 
>     public boolean isFull() {
> 
>         boolean reply = false;
> 
>         for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
>             if (entry.getValue().getCurrentPosition() == Consts.BASIC_WINDOW_SIZE) {
> //                System.out.println("Bolt id "+task_id+" StreamId: " + entry.getKey() + " size " + entry.getValue().getSize() + " curr position: " + entry.getValue().getCurrentPosition());
>                 reply = true;
>                 break; //even if one full stream is found exit loop and return true;
>             }
>         }
>         return reply;
> 
>     }
> 
>     public void print() {
> 
>         Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();
> //        for(Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()){
>         System.out.println("====================================");
>         while (iterator.hasNext()) {
> //            System.out.println("Stream Id: " + iterator.next().getKey() + " size: " + iterator.next().getValue().getCurrentPosition());
>             System.err.println("SIGNATURES: "+iterator.next().getValue().getStreamSignatures());
>             System.err.println("SUMS: "+iterator.next().getValue().getStreamSums());
>         }
>         System.out.println("====================================");
>     }
> 
>     public void printLocalFunctions() {
>         int temp = 0;
>         for (boolean[] entry : localHashFunction) {
>             temp++;
>             for (int i = 0; i < entry.length; i++) {
>                 System.out.println("Window Local Functions: " + temp + " entry " + i + " value: " + entry[i]);
>             }
>         }
>     }
> 
>     public void normalize() {
> 
> //        System.out.println("Normalizing");
> 
>         Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();
> 
>         while (iterator.hasNext()) {
>             Map.Entry<String, StreamBasicWindow> entry = iterator.next();
>             if (entry.getValue().getCurrentPosition() < Consts.BASIC_WINDOW_SIZE) {
>                 for (int i = entry.getValue().getCurrentPosition(); i < Consts.BASIC_WINDOW_SIZE; i++) {
>                     entry.getValue().pushValue(entry.getValue().getLastReceivedValue());
>                 }
>             }
>         }
>     }
> 
> 
> }
>  
> package tuc.LSH.core.hashfunctions;
> 
> import backtype.storm.utils.Utils;
> import com.hazelcast.core.Hazelcast;
> import com.hazelcast.core.HazelcastInstance;
> import com.hazelcast.core.IQueue;
> import tuc.LSH.conf.Consts;
> import tuc.LSH.conf.HazelCastConfig;
> 
> import java.util.Queue;
> import java.util.Random;
> import java.util.concurrent.locks.Lock;
> 
> /**
>  * Created by mixtou on 14/5/15.
>  */
> public class HashFunctionsGen {
> 
>     static IQueue<boolean[]> hashFunctions;
>     static Random rnd;
> 
>     public static boolean cosineFamilyValue() {
>         return rnd.nextBoolean();
>     }
> 
>     public static void generateHashFunctionsForUBW() {
>         HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
>         hashFunctions = hz.getQueue("hashFunctions");
>         rnd = new Random();
> 
>         //do nothing if the required nu of hashfunctions has been generated
>         if (!(hashFunctions.size() > 0)) {
> //            System.out.println("Already generated functions DO NOTHING");
> 
> //        System.out.println("Generating New Hash Functions");
>             Lock lock = hz.getLock("hashFunctions");
>             lock.lock();
>             try {
> 
>                 for (int i = 0; i < Consts.NO_OF_HASH_FUNCTIONS; i++) {
>                     boolean[] temp = new boolean[Consts.BASIC_WINDOW_SIZE];
>                     for (int j = 0; j < Consts.BASIC_WINDOW_SIZE; j++) {
>                         temp[j] = HashFunctionsGen.cosineFamilyValue();
>                     }
>                     hashFunctions.add(temp);
>                 }
>             } finally {
>                 lock.unlock();
>             }
>         }
> 
>     }
> 
>     public static void size() {
>         System.out.println("Size of queue: " + hashFunctions.size());
>     }
> 
>     public static void print() {
>         HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
>         Queue<boolean[]> functions = hz.getQueue("hashFunctions");
> 
>         int temp = 0;
>         for (boolean[] entry : functions) {
>             temp++;
>             for (int i = 0; i < entry.length; i++) {
>                 System.err.println("Hazelcast Function " + temp + " entry " + i + " value: " + entry[i]);
>             }
>         }
>     }
> 
>     public static void clear() {
>         HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
>         hashFunctions = hz.getQueue("hashFunctions");
>         Lock lock = hz.getLock("hashFunctions");
>         lock.lock();
>         try {
>             hashFunctions.clear();
> //                hashFunctions = hz.getQueue("hashFunctions");
>         } finally {
>             lock.unlock();
>         }
> 
> 
>     }
> 
> 
> }


RE: Controllng spout and bolt Generations??

Posted by Ravi Tandon <Ra...@microsoft.com>.
Hi Michail

Forgive if I missed out on you scenario detail but from what you described, you don’t need to have a control over generation to make this work. There are other design alternatives albeit simpler ones. I understand that waiting in bolt prepare for those functions to be available may not be the best option.
How about:
The execute method on bolt will only be called if a spout has emitted, which means spout is already up and running. You could have a small init function (that does what you are doing in prepare) which checks on a flag at each execute and initializes your dependencies – a little extra cpu cycles to check a flag but guarantees the spout to be up. I also prefer such functions when I interact with third-party services as I can re-init if an exception was hit (an alternative to not crashing the bolt and losing state), rather fail the tuple and handle again (crash bolt if errors > X).

This init can do more things based on your task index or streams as you may want to block on particular hashing functions meant for that stream. Worst case, if you ever have to wait on the state only 1 tuple at start or on re-init will pay the execute cost.

Not sure how others solve this problem, would like to hear their opinions.

-Ravi

From: Michail Toutoudakis [mailto:mixtou@gmail.com]
Sent: Wednesday, June 10, 2015 8:19 AM
To: user@storm.apache.org
Subject: Controllng spout and bolt Generations??

Is there a way i can control the sequence spouts and bolts are generated? I am using Hazelcast a distributed data collection and in the spout constructor i am generating some hash functions that will be used by the bolts. However it looks like bolts are initialized before spout, or before hash functions generation thus getting empy hash functions. Universal window object when initialized in constructor reads HazelCast collection which is empty. Hasn’t got any values yet.  My spout, bolt and UniversalWindow class code follows:


package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;
        HashFunctionsGen.generateHashFunctionsForUBW(); //HazelCast HashFunction Generation
        HashFunctionsGen.size();
        System.err.println("Generating hashFunctions");

        try {
            this.scanner = new Scanner(new File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if (!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee delivery
//                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of acked word " + noOfAckedWords);
//        System.out.println("no of acked tuples: "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords);

    }

}


package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.timeseries.SlidingWindow;
import tuc.LSH.core.timeseries.UniversalBasicWindow;

import java.util.Map;
import java.util.Queue;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;
    private SlidingWindow slidingWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        streamId = null;
        time = null;
        value = 0f;
        this.universalBasicWindow = new UniversalBasicWindow();
        this.slidingWindow = new SlidingWindow();
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
//            System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
//            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size
            slidingWindow.addBasicWindow(universalBasicWindow);
//            universalBasicWindow.print();
//            System.err.println("==Bolt: "+task_id);
//            System.out.println("adding basic window to sliding window");
//            System.err.println("Size: "+slidingWindow.getwindow().size());
//            slidingWindow.print();

//            System.out.println("Resetting Window Bolt "+task_id);
            universalBasicWindow.reset();
//            Utils.sleep(1);
//            collector.ack(tuple);

        }else if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));
//            System.out.println("From Bolt: "+task_id);
            universalBasicWindow.pushStream(streamId, value);

            if (universalBasicWindow.isFull()) { //check if any stream of the window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up");

                collector.emit("bwFilled", new Values(task_id));
//                Utils.sleep(1);

            }
        }
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}

//        System.err.println("SourceComponent: "+tuple.getSourceComponent());
//        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
//        System.err.println("Source Task: "+tuple.getSourceTask());
//        System.err.println("SourceGlobalStreamId: "+tuple.getSourceGlobalStreamid());
//        System.err.println("MessageId: "+tuple.getMessageId());




package tuc.LSH.core.timeseries;

import clojure.lang.Cons;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import java.util.*;

/**
 * Created by mixtou on 19/5/15.
 */
public class UniversalBasicWindow {

    private HashMap<String, StreamBasicWindow> allStreamsSignatures;
    private IQueue<boolean[]> hashFunctions;
    private LinkedList<boolean[]> localHashFunction;

    public  UniversalBasicWindow() {
//        HashFunctionsGen.generateHashFunctionsForUBW(); //generate new hashFunctions for new UniversalBasicWindow
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        localHashFunction = new LinkedList<>();
        localHashFunction.addAll(hashFunctions);
        this.allStreamsSignatures = new HashMap<>();

        System.out.println("New Universal Basic Window");
        System.out.println("size: "+hashFunctions.size());
    }

    public void reset() {
        localHashFunction.clear();
        hashFunctions.clear();
        clearValues();
    }

    public void clearValues() {

        for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
            entry.getValue().clear();
//            System.err.println("na sou p: " +entry.getKey());
        }

    }

    public void pushStream(String streamId, float value) {

        StreamBasicWindow streamBasicWindow = allStreamsSignatures.get(streamId);

        if (streamBasicWindow == null) {
//            System.out.println("New Stream: " + streamId);
            streamBasicWindow = new StreamBasicWindow(streamId, localHashFunction);
            allStreamsSignatures.put(streamId, streamBasicWindow);
        }

        streamBasicWindow.pushValue(value);

//        System.out.println("Updated Basic Window of Stream: " + streamId+" at position "+streamBasicWindow.getCurrentPosition());

    }

    public HashMap<String, StreamBasicWindow> getAllStreamsSignatures() {
        return allStreamsSignatures;
    }


    public boolean isFull() {

        boolean reply = false;

        for (Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()) {
            if (entry.getValue().getCurrentPosition() == Consts.BASIC_WINDOW_SIZE) {
//                System.out.println("Bolt id "+task_id+" StreamId: " + entry.getKey() + " size " + entry.getValue().getSize() + " curr position: " + entry.getValue().getCurrentPosition());
                reply = true;
                break; //even if one full stream is found exit loop and return true;
            }
        }
        return reply;

    }

    public void print() {

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();
//        for(Map.Entry<String, StreamBasicWindow> entry : allStreamsSignatures.entrySet()){
        System.out.println("====================================");
        while (iterator.hasNext()) {
//            System.out.println("Stream Id: " + iterator.next().getKey() + " size: " + iterator.next().getValue().getCurrentPosition());
            System.err.println("SIGNATURES: "+iterator.next().getValue().getStreamSignatures());
            System.err.println("SUMS: "+iterator.next().getValue().getStreamSums());
        }
        System.out.println("====================================");
    }

    public void printLocalFunctions() {
        int temp = 0;
        for (boolean[] entry : localHashFunction) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.out.println("Window Local Functions: " + temp + " entry " + i + " value: " + entry[i]);
            }
        }
    }

    public void normalize() {

//        System.out.println("Normalizing");

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = allStreamsSignatures.entrySet().iterator();

        while (iterator.hasNext()) {
            Map.Entry<String, StreamBasicWindow> entry = iterator.next();
            if (entry.getValue().getCurrentPosition() < Consts.BASIC_WINDOW_SIZE) {
                for (int i = entry.getValue().getCurrentPosition(); i < Consts.BASIC_WINDOW_SIZE; i++) {
                    entry.getValue().pushValue(entry.getValue().getLastReceivedValue());
                }
            }
        }
    }


}


package tuc.LSH.core.hashfunctions;

import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Lock;

/**
 * Created by mixtou on 14/5/15.
 */
public class HashFunctionsGen {

    static IQueue<boolean[]> hashFunctions;
    static Random rnd;

    public static boolean cosineFamilyValue() {
        return rnd.nextBoolean();
    }

    public static void generateHashFunctionsForUBW() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        rnd = new Random();

        //do nothing if the required nu of hashfunctions has been generated
        if (!(hashFunctions.size() > 0)) {
//            System.out.println("Already generated functions DO NOTHING");

//        System.out.println("Generating New Hash Functions");
            Lock lock = hz.getLock("hashFunctions");
            lock.lock();
            try {

                for (int i = 0; i < Consts.NO_OF_HASH_FUNCTIONS; i++) {
                    boolean[] temp = new boolean[Consts.BASIC_WINDOW_SIZE];
                    for (int j = 0; j < Consts.BASIC_WINDOW_SIZE; j++) {
                        temp[j] = HashFunctionsGen.cosineFamilyValue();
                    }
                    hashFunctions.add(temp);
                }
            } finally {
                lock.unlock();
            }
        }

    }

    public static void size() {
        System.out.println("Size of queue: " + hashFunctions.size());
    }

    public static void print() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        Queue<boolean[]> functions = hz.getQueue("hashFunctions");

        int temp = 0;
        for (boolean[] entry : functions) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.err.println("Hazelcast Function " + temp + " entry " + i + " value: " + entry[i]);
            }
        }
    }

    public static void clear() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        Lock lock = hz.getLock("hashFunctions");
        lock.lock();
        try {
            hashFunctions.clear();
//                hashFunctions = hz.getQueue("hashFunctions");
        } finally {
            lock.unlock();
        }


    }


}