You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by anshu shukla <an...@gmail.com> on 2015/06/15 21:23:15 UTC

Problem: Custom Receiver for getting events from a Dynamic Queue

I have written a custom receiver for converting the tuples in the Dynamic
Queue/EventGen  to the Dstream.But i dont know why It is only processing
data for some time (3-4 sec.) only and then shows Queue as Empty .ANy
suggestions please ..>>

--code //


public class JavaCustomReceiver extends Receiver<String> implements
ISyntheticEventGen {


    EventGen eventGen;
    BlockingQueue<List<String>> eventQueue;
    String csvFileName;
    String outSpoutCSVLogFileName;
    double scalingFactor;

    public JavaCustomReceiver(String csvFileName, String
outSpoutCSVLogFileName, double scalingFactor) {
        super(StorageLevel.MEMORY_AND_DISK());

        this.csvFileName = csvFileName;
        this.outSpoutCSVLogFileName = outSpoutCSVLogFileName;
        this.scalingFactor = scalingFactor;

        this.eventGen = new EventGen(this,this.scalingFactor);
        this.eventGen.launch(this.csvFileName,
this.outSpoutCSVLogFileName); //Launch threads


        this.eventQueue = new LinkedBlockingQueue<List<String>>();
        System.out.println("for watching queue");
    }

    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread()  {
            @Override public void run() {
                receive();
            }
        }.start();
    }

    public void onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false
    }

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {

        try {
            // connect to the server
//            socket = new Socket(host, port);

//            BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()));

            // Until stopped or connection broken continue reading
            while (!isStopped() ) {
                         List<String> entry = this.eventQueue.take();

                String str="";
                for(String s:entry)
                str+=s+",";
                System.out.println("Received data '" + str + "'");
                store(str);

            }
            // Restart in an attempt to connect again when server is
active again
            restart("Trying to connect again");
        }
        catch(Throwable t) {
            // restart if there is any other error
            restart("Error receiving data", t);
        }
    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_AND_DISK();
    }


    @Override
    public void receive(List<String> event) {
        // TODO Auto-generated method stub
        //System.out.println("Called IN SPOUT### ");
        try {
            this.eventQueue.put(event);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}


-- 
Thanks & Regards,
Anshu Shukla