You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by anshu shukla <an...@gmail.com> on 2015/06/15 21:23:15 UTC
Problem: Custom Receiver for getting events from a Dynamic Queue
I have written a custom receiver for converting the tuples in the Dynamic
Queue/EventGen to the Dstream.But i dont know why It is only processing
data for some time (3-4 sec.) only and then shows Queue as Empty .ANy
suggestions please ..>>
--code //
public class JavaCustomReceiver extends Receiver<String> implements
ISyntheticEventGen {
EventGen eventGen;
BlockingQueue<List<String>> eventQueue;
String csvFileName;
String outSpoutCSVLogFileName;
double scalingFactor;
public JavaCustomReceiver(String csvFileName, String
outSpoutCSVLogFileName, double scalingFactor) {
super(StorageLevel.MEMORY_AND_DISK());
this.csvFileName = csvFileName;
this.outSpoutCSVLogFileName = outSpoutCSVLogFileName;
this.scalingFactor = scalingFactor;
this.eventGen = new EventGen(this,this.scalingFactor);
this.eventGen.launch(this.csvFileName,
this.outSpoutCSVLogFileName); //Launch threads
this.eventQueue = new LinkedBlockingQueue<List<String>>();
System.out.println("for watching queue");
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
receive();
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
try {
// connect to the server
// socket = new Socket(host, port);
// BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()));
// Until stopped or connection broken continue reading
while (!isStopped() ) {
List<String> entry = this.eventQueue.take();
String str="";
for(String s:entry)
str+=s+",";
System.out.println("Received data '" + str + "'");
store(str);
}
// Restart in an attempt to connect again when server is
active again
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_AND_DISK();
}
@Override
public void receive(List<String> event) {
// TODO Auto-generated method stub
//System.out.println("Called IN SPOUT### ");
try {
this.eventQueue.put(event);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
--
Thanks & Regards,
Anshu Shukla