You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Li Peng <li...@elementanalytics.com> on 2017/01/31 03:15:02 UTC

Connection refused error when writing to socket?

Hi there,

I'm trying to test a couple of things by having my stream write to a
socket, but it keeps failing to connect (I'm trying to have a stream
write to a socket, and have another stream read from that socket).

Caused by: java.net.ConnectException: Connection refused (Connection refused)

at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

I tried writeToSocket(parameterTool.get("localhost"),
parameterTool.getInt(9000), new SimpleStringSchema), and even a custom
sink:

addSink(w => {
  val ia = InetAddress.getByName("localhost")
  val socket = new Socket(ia, 9000)
  val outStream = socket.getOutputStream
  val out = new PrintWriter(new BufferedWriter(new
OutputStreamWriter(outStream)))
  out.println(w)
  out.flush()
  out.close()
})

But none of this seem to work. I'm fairly sure I setup the server
correctly since I can connect it to via a telnet and other dummy
echoclients I wrote. I can also have my data stream read from that
same socket without any issues, but I can't seem to tell my stream to
write to this socket without the above connection refused error
showing up. Is there some nuance here I'm missing?

Thanks!

Re: Connection refused error when writing to socket?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Li Peng,

I think what you're trying to do won't work. The problem is that you have
two TCP clients (sink and source) which are supposed to connect to each
other. Without a server which buffers the incoming data and forwards it to
the outgoing connections, it won't be possible to read the previously
written data.

The exception you're observing originates from the fact that the source
tries to connect to the TCP port 9000 which is not open (since there is no
server listening on this port). The same would happen to the sink.

Cheers,
Till

On Tue, Jan 31, 2017 at 9:04 PM, Li Peng <li...@elementanalytics.com>
wrote:

> Yes I did open a socket with netcat. Turns out my first error was due
> to a stream without a sink triggering the socket connect and (I
> thought that without a sink the stream wouldn't affect anything so I
> didn't comment it out, and I didn't open the socket for that port).
> However
>
> I did play with it some more and I think the real issue is that I'm
> trying to have two streams, one write to a port and another read from
> the same port. i.e.
>
> val y = executionEnvironment.socketTextStream("localhost", 9000)
> x.writeToSocket("localhost", 9000, new SimpleStringSchema())
>
> Once I tested just write or just the read it worked, but combined I
> get this error:
>
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:210)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> at java.io.InputStreamReader.read(InputStreamReader.java:184)
> at java.io.BufferedReader.read1(BufferedReader.java:210)
> at java.io.BufferedReader.read(BufferedReader.java:286)
> at java.io.Reader.read(Reader.java:140)
> at org.apache.flink.streaming.api.functions.source.
> SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:80)
> at org.apache.flink.streaming.api.operators.StreamSource.
> run(StreamSource.java:53)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
> SourceStreamTask.java:56)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:267)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)
>
> Is this operation not allowed?
>
> And I'm mainly writing to the same socket in order to pass work back
> and forth between streams.
>

Re: Connection refused error when writing to socket?

Posted by Li Peng <li...@elementanalytics.com>.
Yes I did open a socket with netcat. Turns out my first error was due
to a stream without a sink triggering the socket connect and (I
thought that without a sink the stream wouldn't affect anything so I
didn't comment it out, and I didn't open the socket for that port).
However

I did play with it some more and I think the real issue is that I'm
trying to have two streams, one write to a port and another read from
the same port. i.e.

val y = executionEnvironment.socketTextStream("localhost", 9000)
x.writeToSocket("localhost", 9000, new SimpleStringSchema())

Once I tested just write or just the read it worked, but combined I
get this error:

java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.read1(BufferedReader.java:210)
at java.io.BufferedReader.read(BufferedReader.java:286)
at java.io.Reader.read(Reader.java:140)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:101)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

Is this operation not allowed?

And I'm mainly writing to the same socket in order to pass work back
and forth between streams.

Re: Connection refused error when writing to socket?

Posted by Jonas <jo...@huntun.de>.
Can you try opening a socket with netcat on localhost?

nc -lk 9000

and see it this works? For me this works.

-- Jonas



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Connection-refused-error-when-writing-to-socket-tp11372p11376.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.