You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Junfeng Chen <k-...@hotmail.com> on 2015/04/07 05:27:16 UTC

Cannot connect to socket in spout in cluster mode

I intend to establish Socket connection in spout. Here is my code :

@Override
public void nextTuple() {
    Socket socket = null;
    try {
        socket = serverSocket.accept();
    } catch (IOException e) {
        e.printStackTrace();
    }
    TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
    //disable thread to send the string immediately
    tcpReceiver.receiveData();
    if (!inQueue.isEmpty()){
        String readyString = inQueue.poll();
        _collector.emit(new Values(readyString));
    }
}

 

The nextTuple() is blocked until new socket is established. It works under
LocalCluster mode, but in production envirionment, tcp client cannot connect
to this socket server. I use the command “nc -l 20000”(my listening port
is 20000) while the program is running, it can receive the data from tcp
client. Can any one help me? Thanks 

 

Regard, 

Junfeng Chen(陈俊峰)

 


答复: Cannot connect to socket in spout in cluster mode

Posted by Junfeng Chen <k-...@hotmail.com>.
Thank you for helping. Now I have modified my code, it also doesn’t work. Here is my current code. 

RawMessageSpout.java

public class RawMessageSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    Queue<String> inQueue ;
    ServerSocket serverSocket;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
       _collector=spoutOutputCollector;
        inQueue = new ConcurrentLinkedQueue<String>();
        TCPReceiver tcpReceiver = new TCPReceiver(inQueue);
        tcpReceiver.start();

    }

    @Override
    public void nextTuple() {
        //disable thread to send the string immediately

        if (!inQueue.isEmpty()){
            String readyString = inQueue.poll();
            _collector.emit(new Values(readyString));
        }
    }
}

 

TCPReceiver.java

package communication;


import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Queue;

/**
 * Created by cjf on 2014/9/15.
 */
public class TCPReceiver extends Thread {
    private static Logger logger = Logger.getLogger(TCPReceiver.class);
    private static final int BUFSIZE = 32;
    private static final int PORT = 20001;
    public Socket socket = null;
    public Queue<String> inQueue;
    ServerSocket serverSocket;

    public TCPReceiver(Queue inQueue) {
        //this.socket = socket;
        this.inQueue = inQueue;
        try {
            serverSocket= new ServerSocket(PORT);
        } catch (IOException e) {
            logger.error(PORT+" 端口已被占用");
            e.printStackTrace();
        }
    }

    /**
     * Package receiving method used for processing multi requests
     *
     * @param inQueue message in queue
     */
    public void multiReceivePackage(Queue<String> inQueue) {
        InputStream in = null;

        try {
            byte[] receiveBuf = new byte[BUFSIZE];
            //socket = server.accept();
            //System.out.println("Receive: Connected to socket:" + socket.getRemoteSocketAddress() + "...sending echo string");
            //System.out.println(socket.getRemoteSocketAddress());
            logger.info("Receive: Connected to socket:" + socket.getRemoteSocketAddress().toString() + "...sending echo string");
            in = socket.getInputStream();
            String temp = "";
            while (in.read(receiveBuf) != -1) {
                temp += new String(receiveBuf);
                receiveBuf = new byte[BUFSIZE];
                if (temp.indexOf("#E") >= 0) {
                    break;
                }
            }
            logger.info("Message received: " + temp.trim());

            inQueue.offer(temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
            //System.out.println("Message and IP received: " + temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
            logger.debug("Message and IP received: " + temp.trim() + "|" + socket.getRemoteSocketAddress().toString());
            socket.close();
            //Thread.sleep(500);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {

        }
    }


    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p/>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        //System.out.println("TCPReceiver is running");
        logger.debug("TCPReceiver is running");
        //receivePackage(DataHandler.inQueue);
        while (true){
            try {
                socket = serverSocket.accept();
                multiReceivePackage(this.inQueue);
            } catch (Exception e) {
                //System.out.println("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
                logger.error("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
                e.printStackTrace();
            }
        }


    }

    public void receiveData(){
        //System.out.println("TCPReceiver is running");
        logger.debug("TCPReceiver is running");
        //receivePackage(DataHandler.inQueue);


        try {
            socket = serverSocket.accept();
            multiReceivePackage(this.inQueue);
        } catch (Exception e) {
            //System.out.println("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
            logger.error("[" + Thread.currentThread().getName() + "] Receiver 线程内出错。");
            e.printStackTrace();
        }
    }
}

 

 

发件人: 임정택 [mailto:kabhwan@gmail.com] 
发送时间: 2015年4月7日 11:44
收件人: user@storm.apache.org
主题: Re: Cannot connect to socket in spout in cluster mode

 

Hello.

 

Maybe it's up to serverSocket. Could you expose whole code for verifying?

 

Thanks

 

Regard.

Jungtaek Lim (HeartSaVioR)

 

 

2015-04-07 12:27 GMT+09:00 Junfeng Chen <k-2feng@hotmail.com <ma...@hotmail.com> >:

I intend to establish Socket connection in spout. Here is my code :

@Override
public void nextTuple() {
    Socket socket = null;
    try {
        socket = serverSocket.accept();
    } catch (IOException e) {
        e.printStackTrace();
    }
    TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
    //disable thread to send the string immediately
    tcpReceiver.receiveData();
    if (!inQueue.isEmpty()){
        String readyString = inQueue.poll();
        _collector.emit(new Values(readyString));
    }
}

 

The nextTuple() is blocked until new socket is established. It works under LocalCluster mode, but in production envirionment, tcp client cannot connect to this socket server. I use the command “nc -l 20000”(my listening port is 20000) while the program is running, it can receive the data from tcp client. Can any one help me? Thanks 

 

Regard, 

Junfeng Chen(陈俊峰)

 





 

-- 

Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior

LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Cannot connect to socket in spout in cluster mode

Posted by 임정택 <ka...@gmail.com>.
Hello.

Maybe it's up to serverSocket. Could you expose whole code for verifying?

Thanks

Regard.
Jungtaek Lim (HeartSaVioR)


2015-04-07 12:27 GMT+09:00 Junfeng Chen <k-...@hotmail.com>:

> I intend to establish Socket connection in spout. Here is my code :
>
> @Override
> public void nextTuple() {
>     Socket socket = null;
>     try {
>         socket = serverSocket.accept();
>     } catch (IOException e) {
>         e.printStackTrace();
>     }
>     TCPReceiver tcpReceiver = new TCPReceiver(socket,inQueue);
>     //disable thread to send the string immediately
>     tcpReceiver.receiveData();
>     if (!inQueue.isEmpty()){
>         String readyString = inQueue.poll();
>         _collector.emit(new Values(readyString));
>     }
> }
>
>
>
> The nextTuple() is blocked until new socket is established. It works under
> LocalCluster mode, but in production envirionment, tcp client cannot
> connect to this socket server. I use the command “nc -l 20000”(my listening
> port is 20000) while the program is running, it can receive the data from
> tcp client. Can any one help me? Thanks
>
>
>
> Regard,
>
> *Junfeng Chen**(陈俊峰)*
>
>
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior