You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Márton Balassi <mb...@apache.org> on 2015/03/11 16:37:41 UTC

Fwd: Flink questions

Dear Emmanuel,

I'm Marton, one of the Flink Streaming developers - Robert forwarded your
issue to me. Thanks for trying out our project.

1) Debugging: TaskManager logs are currently not forwarded to the UI, but
you can find them on the taskmanager machines in the log folder of your
Flink distribution. We have this issue on our agenda in the very near
future - they need to be accessible from the UI.

2) Output to socket: Currently we do not have a preimplemented sink for
sockets (although we offer a socket source and sinks writing to Apache
Kafka, Flume and RabbitMQ). You can easily implement a socket sink by
extending the abstract RichSinkFunction class though. [1]

For using that you can simply say dataStream.addSink(MySinkFunction()) - in
that you can bring up a socket or any other service. You would create a
socket in the open function and then in the invoke method you would write
every value out to it.

I do agree that this is a nice tool to have so I have opened a JIRA ticket
for it. [2]

3) Internal data format: Robert was kind enough to offer a more detailed
answer on this issue. In general streaming sinks support any file output
that is supported by batch Flink including Avro. You can use this
functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).

[1]
http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
[2] https://issues.apache.org/jira/browse/FLINK-1688

Best,

Marton

*From:* Emmanuel <el...@msn.com>
*Date:* 11. März 2015 14:59:31 MEZ
*To:* Robert Metzger <rm...@apache.org>, Henry Saputra <
hsaputra@apache.org>
*Subject:* *Flink questions*

Hello,



Thanks again for the help yesterday: the simple things go a long way to get
me moving...

I have more questions i hope I can get your opinion and input about:

*Debugging:*
What's the preferred or recommended way to proceed?
I have been using some System.out.println() statements in my simple test
code, and the results are confusing:
First, in the UI, the logs are for the jobmanager.out, but there is never
anything there; wherever i see output in a log it's on the taskmanager.out
file
Also, even more confusing is the fact that often times I just get no log at
all... the UI says the topology is running, but nothing get printed out...
Is there a process you'd recommend to follow to debug properly with logs?

*Output to socket*
Ideally I'd like to print out to a socket/stream and read from another
machine so as not to choke the node with disk I/Os when testing
performances. Not sure how to do that.

*Internal Data format*
Finally, a practical question about data format: we ingest JSON, which is
not convenient, and uses a lot of space. Internally Java/Scala prefers
Tuples, and we were thinking of using ProtoBuffs.
There is also Avro that could do this as I understand it... What would be
the recommended way to format data internally?

Thanks for your input.

Cheers
Emmanuel

RE: Socket output stream

Posted by Emmanuel <el...@msn.com>.
Thanks...
This is what I come up with(note I only print every 100,000, or at least this is the intent or i see quite a drop in performance)I hope that can help others too, although there is probably room for improvement.Cheers.
package org.myorg.quickstart;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;

public class SocketSink<IN> extends RichSinkFunction<IN> {

    private transient PrintWriter out;
    private String HOST;
    private Integer PORT;
    private transient Socket clientSocket;
    //private SerializationSchema<IN, byte[]> scheme;
    private static long count = 0;

    private static final Logger LOG = LoggerFactory.getLogger(SocketSink.class);

    //private static int count = 0;

    public SocketSink(String HOST, Integer PORT) { //, SerializationSchema<IN, byte[]> schema
        this.HOST = HOST;
        this.PORT = PORT;

        //this.scheme = schema;
    }

    public void initializeConnection() {

        try {
            clientSocket = new Socket(HOST,PORT);
            this.count = 0;
            out = new PrintWriter(clientSocket.getOutputStream(), true);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void invoke(IN o) {
        try {
            if (count % 100000 == 0) {
                out.println(o);
            }
            count += 1;
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }

    public void closeConnection() {
        try {
            System.out.println("closing connection");
            clientSocket.close();

        } catch (IOException e) {
            System.out.println(e.getMessage());
        }
    }

    @Override
    public void open(Configuration config){
        initializeConnection();
    }

    @Override
    public void close() {
        closeConnection();
    }

    //@Override
    public void cancel() {
        close();
    }

}

Date: Thu, 12 Mar 2015 01:36:09 +0100
Subject: Re: Socket output stream
From: fhueske@gmail.com
To: user@flink.apache.org

It is in AbstractRichFunction [1]. 
RichSinkFunction extends AbstractRichFunction:public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>
Best, Fabian
[1] https://github.com/apache/flink/blob/583c527fc3fc693dd40b908d969f1e510ff7dfb3/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
2015-03-12 1:28 GMT+01:00 Emmanuel <el...@msn.com>:



I don't see an 'open()' function to override in the RichSinkFunction or the SinkFunction... so where is this open() function supposed to be?

Date: Thu, 12 Mar 2015 01:17:34 +0100
Subject: Re: Socket output stream
From: fhueske@gmail.com
To: user@flink.apache.org

Hi Emmanuel,
the open() method should the right place for setting up the socket connection. It is called on the worker node before the first input arrives.
Best, Fabian
2015-03-12 1:05 GMT+01:00 Emmanuel <el...@msn.com>:



Hi Marton,
Thanks for the info.
I've been trying to implement a socket sink but running into 'Not Serializable' kind of issues.I was seeing in the Spark docs that this is typically an issue, where the socket should be created on the worker node, as it can't be serialized to be moved from the supervisor.http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
So, not sure how this would be implemented in Flink...My attempt (maybe very naive) looked like this:
public static final class SocketSink extends RichSinkFunction<String> {
    
    private PrintWriter out;

    public SocketSink(String host, Integer port) throws IOException {
        Socket clientSocket = new Socket(host,port);
        out = new PrintWriter(clientSocket.getOutputStream(), true);
    }

    @Override
    public void invoke(String s) {
        out.println(s);
    }
}

maybe i should just move to Kafka directly... ;/Thanks for helpEmmanuel

From: mbalassi@apache.org
Date: Wed, 11 Mar 2015 16:37:41 +0100
Subject: Fwd: Flink questions
To: eleroy@msn.com
CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org

Dear Emmanuel,
I'm Marton, one of the Flink Streaming developers - Robert forwarded your issue to me. Thanks for trying out our project.
1) Debugging: TaskManager logs are currently not forwarded to the UI, but you can find them on the taskmanager machines in the log folder of your Flink distribution. We have this issue on our agenda in the very near future - they need to be accessible from the UI.
2) Output to socket: Currently we do not have a preimplemented sink for sockets (although we offer a socket source and sinks writing to Apache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by extending the abstract RichSinkFunction class though. [1]
For using that you can simply say dataStream.addSink(MySinkFunction()) - in that you can bring up a socket or any other service. You would create a socket in the open function and then in the invoke method you would write every value out to it.
I do agree that this is a nice tool to have so I have opened a JIRA ticket for it. [2]
3) Internal data format: Robert was kind enough to offer a more detailed answer on this issue. In general streaming sinks support any file output that is supported by batch Flink including Avro. You can use this functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
[1] http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
[2] https://issues.apache.org/jira/browse/FLINK-1688
Best,
MartonFrom: Emmanuel <el...@msn.com>
Date: 11. März 2015 14:59:31 MEZ
To: Robert Metzger <rm...@apache.org>, Henry Saputra <hs...@apache.org>
Subject: Flink questions




Hello,

Thanks again for the help yesterday: the simple things go a long way to get me moving...
I have more questions i hope I can get your opinion and input about:
Debugging:What's the preferred or recommended way to proceed? I have been using some System.out.println() statements in my simple test code, and the results are confusing:First, in the UI, the logs are for the jobmanager.out, but there is never anything there; wherever i see output in a log it's on the taskmanager.out fileAlso, even more confusing is the fact that often times I just get no log at all... the UI says the topology is running, but nothing get printed out...Is there a process you'd recommend to follow to debug properly with logs? 
Output to socketIdeally I'd like to print out to a socket/stream and read from another machine so as not to choke the node with disk I/Os when testing performances. Not sure how to do that.
Internal Data formatFinally, a practical question about data format: we ingest JSON, which is not convenient, and uses a lot of space. Internally Java/Scala prefers Tuples, and we were thinking of using ProtoBuffs. There is also Avro that could do this as I understand it... What would be the recommended way to format data internally?
Thanks for your input.
CheersEmmanuel 		 	   		  

 		 	   		  

 		 	   		  

 		 	   		  

 		 	   		  

Re: Socket output stream

Posted by Fabian Hueske <fh...@gmail.com>.
It is in AbstractRichFunction [1].

RichSinkFunction extends AbstractRichFunction:
public abstract class RichSinkFunction<IN> extends AbstractRichFunction
implements SinkFunction<IN>

Best, Fabian

[1]
https://github.com/apache/flink/blob/583c527fc3fc693dd40b908d969f1e510ff7dfb3/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java

2015-03-12 1:28 GMT+01:00 Emmanuel <el...@msn.com>:

> I don't see an 'open()' function to override in the RichSinkFunction or
> the SinkFunction... so where is this open() function supposed to be?
>
>
> ------------------------------
> Date: Thu, 12 Mar 2015 01:17:34 +0100
> Subject: Re: Socket output stream
> From: fhueske@gmail.com
> To: user@flink.apache.org
>
>
> Hi Emmanuel,
>
> the open() method should the right place for setting up the socket
> connection. It is called on the worker node before the first input arrives.
>
> Best, Fabian
>
> 2015-03-12 1:05 GMT+01:00 Emmanuel <el...@msn.com>:
>
> Hi Marton,
>
> Thanks for the info.
>
> I've been trying to implement a socket sink but running into 'Not
> Serializable' kind of issues.
> I was seeing in the Spark docs that this is typically an issue, where the
> socket should be created on the worker node, as it can't be serialized to
> be moved from the supervisor.
>
> http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> So, not sure how this would be implemented in Flink...
> My attempt (maybe very naive) looked like this:
>
> public static final class SocketSink extends RichSinkFunction<String> {
>
>     private PrintWriter out;
>
>     public SocketSink(String host, Integer port) throws IOException {
>         Socket clientSocket = new Socket(host,port);
>         out = new PrintWriter(clientSocket.getOutputStream(), true);
>     }
>
>     @Override
>     public void invoke(String s) {
>         out.println(s);
>     }
> }
>
>
> maybe i should just move to Kafka directly... ;/
>
> Thanks for help
>
> Emmanuel
>
>
>
> ------------------------------
> From: mbalassi@apache.org
> Date: Wed, 11 Mar 2015 16:37:41 +0100
> Subject: Fwd: Flink questions
> To: eleroy@msn.com
> CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org
>
> Dear Emmanuel,
>
> I'm Marton, one of the Flink Streaming developers - Robert forwarded your
> issue to me. Thanks for trying out our project.
>
> 1) Debugging: TaskManager logs are currently not forwarded to the UI, but
> you can find them on the taskmanager machines in the log folder of your
> Flink distribution. We have this issue on our agenda in the very near
> future - they need to be accessible from the UI.
>
> 2) Output to socket: Currently we do not have a preimplemented sink for
> sockets (although we offer a socket source and sinks writing to Apache
> Kafka, Flume and RabbitMQ). You can easily implement a socket sink by
> extending the abstract RichSinkFunction class though. [1]
>
> For using that you can simply say dataStream.addSink(MySinkFunction()) -
> in that you can bring up a socket or any other service. You would create a
> socket in the open function and then in the invoke method you would write
> every value out to it.
>
> I do agree that this is a nice tool to have so I have opened a JIRA ticket
> for it. [2]
>
> 3) Internal data format: Robert was kind enough to offer a more detailed
> answer on this issue. In general streaming sinks support any file output
> that is supported by batch Flink including Avro. You can use this
> functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
>
> [1]
> http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
> [2] https://issues.apache.org/jira/browse/FLINK-1688
>
> Best,
>
> Marton
>
> *From:* Emmanuel <el...@msn.com>
> *Date:* 11. März 2015 14:59:31 MEZ
> *To:* Robert Metzger <rm...@apache.org>, Henry Saputra <
> hsaputra@apache.org>
> *Subject:* *Flink questions*
>
> Hello,
>
>
>
> Thanks again for the help yesterday: the simple things go a long way to
> get me moving...
>
> I have more questions i hope I can get your opinion and input about:
>
> *Debugging:*
> What's the preferred or recommended way to proceed?
> I have been using some System.out.println() statements in my simple test
> code, and the results are confusing:
> First, in the UI, the logs are for the jobmanager.out, but there is never
> anything there; wherever i see output in a log it's on the taskmanager.out
> file
> Also, even more confusing is the fact that often times I just get no log
> at all... the UI says the topology is running, but nothing get printed
> out...
> Is there a process you'd recommend to follow to debug properly with logs?
>
> *Output to socket*
> Ideally I'd like to print out to a socket/stream and read from another
> machine so as not to choke the node with disk I/Os when testing
> performances. Not sure how to do that.
>
> *Internal Data format*
> Finally, a practical question about data format: we ingest JSON, which is
> not convenient, and uses a lot of space. Internally Java/Scala prefers
> Tuples, and we were thinking of using ProtoBuffs.
> There is also Avro that could do this as I understand it... What would be
> the recommended way to format data internally?
>
> Thanks for your input.
>
> Cheers
> Emmanuel
>
>
>
>
>

RE: Socket output stream

Posted by Emmanuel <el...@msn.com>.
I don't see an 'open()' function to override in the RichSinkFunction or the SinkFunction... so where is this open() function supposed to be?

Date: Thu, 12 Mar 2015 01:17:34 +0100
Subject: Re: Socket output stream
From: fhueske@gmail.com
To: user@flink.apache.org

Hi Emmanuel,
the open() method should the right place for setting up the socket connection. It is called on the worker node before the first input arrives.
Best, Fabian
2015-03-12 1:05 GMT+01:00 Emmanuel <el...@msn.com>:



Hi Marton,
Thanks for the info.
I've been trying to implement a socket sink but running into 'Not Serializable' kind of issues.I was seeing in the Spark docs that this is typically an issue, where the socket should be created on the worker node, as it can't be serialized to be moved from the supervisor.http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
So, not sure how this would be implemented in Flink...My attempt (maybe very naive) looked like this:
public static final class SocketSink extends RichSinkFunction<String> {
    
    private PrintWriter out;

    public SocketSink(String host, Integer port) throws IOException {
        Socket clientSocket = new Socket(host,port);
        out = new PrintWriter(clientSocket.getOutputStream(), true);
    }

    @Override
    public void invoke(String s) {
        out.println(s);
    }
}

maybe i should just move to Kafka directly... ;/Thanks for helpEmmanuel

From: mbalassi@apache.org
Date: Wed, 11 Mar 2015 16:37:41 +0100
Subject: Fwd: Flink questions
To: eleroy@msn.com
CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org

Dear Emmanuel,
I'm Marton, one of the Flink Streaming developers - Robert forwarded your issue to me. Thanks for trying out our project.
1) Debugging: TaskManager logs are currently not forwarded to the UI, but you can find them on the taskmanager machines in the log folder of your Flink distribution. We have this issue on our agenda in the very near future - they need to be accessible from the UI.
2) Output to socket: Currently we do not have a preimplemented sink for sockets (although we offer a socket source and sinks writing to Apache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by extending the abstract RichSinkFunction class though. [1]
For using that you can simply say dataStream.addSink(MySinkFunction()) - in that you can bring up a socket or any other service. You would create a socket in the open function and then in the invoke method you would write every value out to it.
I do agree that this is a nice tool to have so I have opened a JIRA ticket for it. [2]
3) Internal data format: Robert was kind enough to offer a more detailed answer on this issue. In general streaming sinks support any file output that is supported by batch Flink including Avro. You can use this functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
[1] http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
[2] https://issues.apache.org/jira/browse/FLINK-1688
Best,
MartonFrom: Emmanuel <el...@msn.com>
Date: 11. März 2015 14:59:31 MEZ
To: Robert Metzger <rm...@apache.org>, Henry Saputra <hs...@apache.org>
Subject: Flink questions




Hello,

Thanks again for the help yesterday: the simple things go a long way to get me moving...
I have more questions i hope I can get your opinion and input about:
Debugging:What's the preferred or recommended way to proceed? I have been using some System.out.println() statements in my simple test code, and the results are confusing:First, in the UI, the logs are for the jobmanager.out, but there is never anything there; wherever i see output in a log it's on the taskmanager.out fileAlso, even more confusing is the fact that often times I just get no log at all... the UI says the topology is running, but nothing get printed out...Is there a process you'd recommend to follow to debug properly with logs? 
Output to socketIdeally I'd like to print out to a socket/stream and read from another machine so as not to choke the node with disk I/Os when testing performances. Not sure how to do that.
Internal Data formatFinally, a practical question about data format: we ingest JSON, which is not convenient, and uses a lot of space. Internally Java/Scala prefers Tuples, and we were thinking of using ProtoBuffs. There is also Avro that could do this as I understand it... What would be the recommended way to format data internally?
Thanks for your input.
CheersEmmanuel 		 	   		  

 		 	   		  

 		 	   		  

 		 	   		  

Re: Socket output stream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Emmanuel,

the open() method should the right place for setting up the socket
connection. It is called on the worker node before the first input arrives.

Best, Fabian

2015-03-12 1:05 GMT+01:00 Emmanuel <el...@msn.com>:

> Hi Marton,
>
> Thanks for the info.
>
> I've been trying to implement a socket sink but running into 'Not
> Serializable' kind of issues.
> I was seeing in the Spark docs that this is typically an issue, where the
> socket should be created on the worker node, as it can't be serialized to
> be moved from the supervisor.
>
> http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> So, not sure how this would be implemented in Flink...
> My attempt (maybe very naive) looked like this:
>
> public static final class SocketSink extends RichSinkFunction<String> {
>
>     private PrintWriter out;
>
>     public SocketSink(String host, Integer port) throws IOException {
>         Socket clientSocket = new Socket(host,port);
>         out = new PrintWriter(clientSocket.getOutputStream(), true);
>     }
>
>     @Override
>     public void invoke(String s) {
>         out.println(s);
>     }
> }
>
>
> maybe i should just move to Kafka directly... ;/
>
> Thanks for help
>
> Emmanuel
>
>
>
> ------------------------------
> From: mbalassi@apache.org
> Date: Wed, 11 Mar 2015 16:37:41 +0100
> Subject: Fwd: Flink questions
> To: eleroy@msn.com
> CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org
>
> Dear Emmanuel,
>
> I'm Marton, one of the Flink Streaming developers - Robert forwarded your
> issue to me. Thanks for trying out our project.
>
> 1) Debugging: TaskManager logs are currently not forwarded to the UI, but
> you can find them on the taskmanager machines in the log folder of your
> Flink distribution. We have this issue on our agenda in the very near
> future - they need to be accessible from the UI.
>
> 2) Output to socket: Currently we do not have a preimplemented sink for
> sockets (although we offer a socket source and sinks writing to Apache
> Kafka, Flume and RabbitMQ). You can easily implement a socket sink by
> extending the abstract RichSinkFunction class though. [1]
>
> For using that you can simply say dataStream.addSink(MySinkFunction()) -
> in that you can bring up a socket or any other service. You would create a
> socket in the open function and then in the invoke method you would write
> every value out to it.
>
> I do agree that this is a nice tool to have so I have opened a JIRA ticket
> for it. [2]
>
> 3) Internal data format: Robert was kind enough to offer a more detailed
> answer on this issue. In general streaming sinks support any file output
> that is supported by batch Flink including Avro. You can use this
> functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
>
> [1]
> http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
> [2] https://issues.apache.org/jira/browse/FLINK-1688
>
> Best,
>
> Marton
>
> *From:* Emmanuel <el...@msn.com>
> *Date:* 11. März 2015 14:59:31 MEZ
> *To:* Robert Metzger <rm...@apache.org>, Henry Saputra <
> hsaputra@apache.org>
> *Subject:* *Flink questions*
>
> Hello,
>
>
>
> Thanks again for the help yesterday: the simple things go a long way to
> get me moving...
>
> I have more questions i hope I can get your opinion and input about:
>
> *Debugging:*
> What's the preferred or recommended way to proceed?
> I have been using some System.out.println() statements in my simple test
> code, and the results are confusing:
> First, in the UI, the logs are for the jobmanager.out, but there is never
> anything there; wherever i see output in a log it's on the taskmanager.out
> file
> Also, even more confusing is the fact that often times I just get no log
> at all... the UI says the topology is running, but nothing get printed
> out...
> Is there a process you'd recommend to follow to debug properly with logs?
>
> *Output to socket*
> Ideally I'd like to print out to a socket/stream and read from another
> machine so as not to choke the node with disk I/Os when testing
> performances. Not sure how to do that.
>
> *Internal Data format*
> Finally, a practical question about data format: we ingest JSON, which is
> not convenient, and uses a lot of space. Internally Java/Scala prefers
> Tuples, and we were thinking of using ProtoBuffs.
> There is also Avro that could do this as I understand it... What would be
> the recommended way to format data internally?
>
> Thanks for your input.
>
> Cheers
> Emmanuel
>
>
>
>

Socket output stream

Posted by Emmanuel <el...@msn.com>.
Hi Marton,
Thanks for the info.
I've been trying to implement a socket sink but running into 'Not Serializable' kind of issues.I was seeing in the Spark docs that this is typically an issue, where the socket should be created on the worker node, as it can't be serialized to be moved from the supervisor.http://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
So, not sure how this would be implemented in Flink...My attempt (maybe very naive) looked like this:
public static final class SocketSink extends RichSinkFunction<String> {
    
    private PrintWriter out;

    public SocketSink(String host, Integer port) throws IOException {
        Socket clientSocket = new Socket(host,port);
        out = new PrintWriter(clientSocket.getOutputStream(), true);
    }

    @Override
    public void invoke(String s) {
        out.println(s);
    }
}

maybe i should just move to Kafka directly... ;/Thanks for helpEmmanuel

From: mbalassi@apache.org
Date: Wed, 11 Mar 2015 16:37:41 +0100
Subject: Fwd: Flink questions
To: eleroy@msn.com
CC: rmetzger@apache.org; hsaputra@apache.org; user@flink.apache.org

Dear Emmanuel,
I'm Marton, one of the Flink Streaming developers - Robert forwarded your issue to me. Thanks for trying out our project.
1) Debugging: TaskManager logs are currently not forwarded to the UI, but you can find them on the taskmanager machines in the log folder of your Flink distribution. We have this issue on our agenda in the very near future - they need to be accessible from the UI.
2) Output to socket: Currently we do not have a preimplemented sink for sockets (although we offer a socket source and sinks writing to Apache Kafka, Flume and RabbitMQ). You can easily implement a socket sink by extending the abstract RichSinkFunction class though. [1]
For using that you can simply say dataStream.addSink(MySinkFunction()) - in that you can bring up a socket or any other service. You would create a socket in the open function and then in the invoke method you would write every value out to it.
I do agree that this is a nice tool to have so I have opened a JIRA ticket for it. [2]
3) Internal data format: Robert was kind enough to offer a more detailed answer on this issue. In general streaming sinks support any file output that is supported by batch Flink including Avro. You can use this functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
[1] http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
[2] https://issues.apache.org/jira/browse/FLINK-1688
Best,
MartonFrom: Emmanuel <el...@msn.com>
Date: 11. März 2015 14:59:31 MEZ
To: Robert Metzger <rm...@apache.org>, Henry Saputra <hs...@apache.org>
Subject: Flink questions




Hello,

Thanks again for the help yesterday: the simple things go a long way to get me moving...
I have more questions i hope I can get your opinion and input about:
Debugging:What's the preferred or recommended way to proceed? I have been using some System.out.println() statements in my simple test code, and the results are confusing:First, in the UI, the logs are for the jobmanager.out, but there is never anything there; wherever i see output in a log it's on the taskmanager.out fileAlso, even more confusing is the fact that often times I just get no log at all... the UI says the topology is running, but nothing get printed out...Is there a process you'd recommend to follow to debug properly with logs? 
Output to socketIdeally I'd like to print out to a socket/stream and read from another machine so as not to choke the node with disk I/Os when testing performances. Not sure how to do that.
Internal Data formatFinally, a practical question about data format: we ingest JSON, which is not convenient, and uses a lot of space. Internally Java/Scala prefers Tuples, and we were thinking of using ProtoBuffs. There is also Avro that could do this as I understand it... What would be the recommended way to format data internally?
Thanks for your input.
CheersEmmanuel 		 	   		  

 		 	   		  

 		 	   		  

Re: Flink questions

Posted by Robert Metzger <rm...@apache.org>.
So regarding your last question, the internal data format:

Flink is internally (for sending data from one operator to another) using
its own data serialization framework, which is very efficient for
Tuples/Case classes etc.
So you as a user do not need to care about that much. If other components
of your infrastructure (application servers for example) rely on generated
classes (for example created by protobuf or Avro), you can use these
classes with Flink as well.

If you are planning to write some of the data you're streaming to a
persistent storage (for example HDFS or a local file system), then I would
recommend a format like Avro.

Regarding the incoming data: JSON is indeed a bit space inefficient when
represented as a string. If you are able to change the format of the
incoming data, you can also write your own data source (implementing the
SourceFunction interface)




On Wed, Mar 11, 2015 at 4:37 PM, Márton Balassi <mb...@apache.org> wrote:

> Dear Emmanuel,
>
> I'm Marton, one of the Flink Streaming developers - Robert forwarded your
> issue to me. Thanks for trying out our project.
>
> 1) Debugging: TaskManager logs are currently not forwarded to the UI, but
> you can find them on the taskmanager machines in the log folder of your
> Flink distribution. We have this issue on our agenda in the very near
> future - they need to be accessible from the UI.
>
> 2) Output to socket: Currently we do not have a preimplemented sink for
> sockets (although we offer a socket source and sinks writing to Apache
> Kafka, Flume and RabbitMQ). You can easily implement a socket sink by
> extending the abstract RichSinkFunction class though. [1]
>
> For using that you can simply say dataStream.addSink(MySinkFunction()) -
> in that you can bring up a socket or any other service. You would create a
> socket in the open function and then in the invoke method you would write
> every value out to it.
>
> I do agree that this is a nice tool to have so I have opened a JIRA ticket
> for it. [2]
>
> 3) Internal data format: Robert was kind enough to offer a more detailed
> answer on this issue. In general streaming sinks support any file output
> that is supported by batch Flink including Avro. You can use this
> functionality by dataStream.addSink(new FileSinkFunction<>(OutputFormat)).
>
> [1]
> http://ci.apache.org/projects/flink/flink-docs-master/streaming_guide.html#connecting-to-the-outside-world
> [2] https://issues.apache.org/jira/browse/FLINK-1688
>
> Best,
>
> Marton
>
> *From:* Emmanuel <el...@msn.com>
> *Date:* 11. März 2015 14:59:31 MEZ
> *To:* Robert Metzger <rm...@apache.org>, Henry Saputra <
> hsaputra@apache.org>
> *Subject:* *Flink questions*
>
> Hello,
>
>
>
> Thanks again for the help yesterday: the simple things go a long way to
> get me moving...
>
> I have more questions i hope I can get your opinion and input about:
>
> *Debugging:*
> What's the preferred or recommended way to proceed?
> I have been using some System.out.println() statements in my simple test
> code, and the results are confusing:
> First, in the UI, the logs are for the jobmanager.out, but there is never
> anything there; wherever i see output in a log it's on the taskmanager.out
> file
> Also, even more confusing is the fact that often times I just get no log
> at all... the UI says the topology is running, but nothing get printed
> out...
> Is there a process you'd recommend to follow to debug properly with logs?
>
> *Output to socket*
> Ideally I'd like to print out to a socket/stream and read from another
> machine so as not to choke the node with disk I/Os when testing
> performances. Not sure how to do that.
>
> *Internal Data format*
> Finally, a practical question about data format: we ingest JSON, which is
> not convenient, and uses a lot of space. Internally Java/Scala prefers
> Tuples, and we were thinking of using ProtoBuffs.
> There is also Avro that could do this as I understand it... What would be
> the recommended way to format data internally?
>
> Thanks for your input.
>
> Cheers
> Emmanuel
>
>
>
>