You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Junfeng Chen <k-...@hotmail.com> on 2015/04/07 05:27:16 UTC
Cannot connect to socket in spout in cluster mode
I intend to establish Socket connection in spout. Here is my code :
@Override
public void nextTuple() {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {
e.printStackTrace();
}
TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
//disable thread to send the string immediately
tcpReceiver.receiveData();
if (!inQueue.isEmpty()){
String readyString = inQueue.poll();
_collector.emit(new Values(readyString));
}
}
The nextTuple() is blocked until new socket is established. It works under
LocalCluster mode, but in production envirionment, tcp client cannot connect
to this socket server. I use the command “nc -l 20000”(my listening port
is 20000) while the program is running, it can receive the data from tcp
client. Can any one help me? Thanks
Regard,
Junfeng Chen(陈俊峰)
答复: Cannot connect to socket in spout in cluster mode
Posted by Junfeng Chen <k-...@hotmail.com>.
Thank you for helping. Now I have modified my code, it also doesn’t work. Here is my current code.
RawMessageSpout.java
public class RawMessageSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Queue<String> inQueue ;
ServerSocket serverSocket;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("message"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
_collector=spoutOutputCollector;
inQueue = new ConcurrentLinkedQueue<String>();
TCPReceiver tcpReceiver = new TCPReceiver(inQueue);
tcpReceiver.start();
}
@Override
public void nextTuple() {
//disable thread to send the string immediately
if (!inQueue.isEmpty()){
String readyString = inQueue.poll();
_collector.emit(new Values(readyString));
}
}
}
TCPReceiver.java
package communication;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Queue;
/**
* Created by cjf on 2014/9/15.
*/
public class TCPReceiver extends Thread {
private static Logger logger = Logger.getLogger(TCPReceiver.class);
private static final int BUFSIZE = 32;
private static final int PORT = 20001;
public Socket socket = null;
public Queue<String> inQueue;
ServerSocket serverSocket;
public TCPReceiver(Queue inQueue) {
//this.socket = socket;
this.inQueue = inQueue;
try {
serverSocket= new ServerSocket(PORT);
} catch (IOException e) {
logger.error(PORT+" 端口已被占用");
e.printStackTrace();
}
}
/**
* Package receiving method used for processing multi requests
*
* @param inQueue message in queue
*/
public void multiReceivePackage(Queue<String> inQueue) {
InputStream in = null;
try {
byte[] receiveBuf = new byte[BUFSIZE];
//socket = server.accept();
//System.out.println("Receive: Connected to socket:" + socket.getRemoteSocketAddress() + "...sending echo string");
//System.out.println(socket.getRemoteSocketAddress());
logger.info("Receive: Connected to socket:" + socket.getRemoteSocketAddress().toString() + "...sending echo string");
in = socket.getInputStream();
String temp = "";
while (in.read(receiveBuf) != -1) {
temp += new String(receiveBuf);
receiveBuf = new byte[BUFSIZE];
if (temp.indexOf("#E") >= 0) {
break;
}
}
logger.info("Message received: " + temp.trim());
inQueue.offer(temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
//System.out.println("Message and IP received: " + temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
logger.debug("Message and IP received: " + temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
socket.close();
//Thread.sleep(500);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p/>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
//System.out.println("TCPReceiver is running");
logger.debug("TCPReceiver is running");
//receivePackage(DataHandler.inQueue);
while (true){
try {
socket = serverSocket.accept();
multiReceivePackage(this.inQueue);
} catch (Exception e) {
//System.out.println("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
logger.error("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
e.printStackTrace();
}
}
}
public void receiveData(){
//System.out.println("TCPReceiver is running");
logger.debug("TCPReceiver is running");
//receivePackage(DataHandler.inQueue);
try {
socket = serverSocket.accept();
multiReceivePackage(this.inQueue);
} catch (Exception e) {
//System.out.println("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
logger.error("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
e.printStackTrace();
}
}
}
发件人: 임정택 [mailto:kabhwan@gmail.com]
发送时间: 2015年4月7日 11:44
收件人: user@storm.apache.org
主题: Re: Cannot connect to socket in spout in cluster mode
Hello.
Maybe it's up to serverSocket. Could you expose whole code for verifying?
Thanks
Regard.
Jungtaek Lim (HeartSaVioR)
2015-04-07 12:27 GMT+09:00 Junfeng Chen <k-2feng@hotmail.com <ma...@hotmail.com> >:
I intend to establish Socket connection in spout. Here is my code :
@Override
public void nextTuple() {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {
e.printStackTrace();
}
TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
//disable thread to send the string immediately
tcpReceiver.receiveData();
if (!inQueue.isEmpty()){
String readyString = inQueue.poll();
_collector.emit(new Values(readyString));
}
}
The nextTuple() is blocked until new socket is established. It works under LocalCluster mode, but in production envirionment, tcp client cannot connect to this socket server. I use the command “nc -l 20000”(my listening port is 20000) while the program is running, it can receive the data from tcp client. Can any one help me? Thanks
Regard,
Junfeng Chen(陈俊峰)
--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior
Re: Cannot connect to socket in spout in cluster mode
Posted by 임정택 <ka...@gmail.com>.
Hello.
Maybe it's up to serverSocket. Could you expose whole code for verifying?
Thanks
Regard.
Jungtaek Lim (HeartSaVioR)
2015-04-07 12:27 GMT+09:00 Junfeng Chen <k-...@hotmail.com>:
> I intend to establish Socket connection in spout. Here is my code :
>
> @Override
> public void nextTuple() {
> Socket socket = null;
> try {
> socket = serverSocket.accept();
> } catch (IOException e) {
> e.printStackTrace();
> }
> TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
> //disable thread to send the string immediately
> tcpReceiver.receiveData();
> if (!inQueue.isEmpty()){
> String readyString = inQueue.poll();
> _collector.emit(new Values(readyString));
> }
> }
>
>
>
> The nextTuple() is blocked until new socket is established. It works under
> LocalCluster mode, but in production envirionment, tcp client cannot
> connect to this socket server. I use the command “nc -l 20000”(my listening
> port is 20000) while the program is running, it can receive the data from
> tcp client. Can any one help me? Thanks
>
>
>
> Regard,
>
> *Junfeng Chen**(陈俊峰)*
>
>
>
--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior