You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arvid Heise <ar...@ververica.com> on 2020/05/05 12:37:43 UTC

Re: Reading from sockets using dataset api

Hi Kaan,

explicitly mapping to physical nodes is currently not supported and would
need some workarounds. I have readded user mailing list (please always also
include it in response); maybe someone can help you with that.

Best,

Arvid

On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak <ka...@gmail.com> wrote:

> One quick question tho, on each generator node I am opening 24 sockets
> (number of cores that I have). Is there a way to guarantee that while doing
> the map function, each of the slave nodes distributes this 24 socket ports
> between its task slots(each slave also have 24 slave),
> Sorry, I have asked a lot questions.
>
> Stay safe!
> Kaan
>
> On Thu, Apr 30, 2020 at 3:06 AM Kaan Sancak <ka...@gmail.com> wrote:
>
>> Hi Arvid,
>> As you said, I am only interested in batch processing right now. And it
>> seems to be working fine now.
>>
>> Thanks for your help!
>> Best
>> Kaan
>>
>> On Thu, Apr 30, 2020 at 2:31 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Kaan,
>>>
>>> not entirely sure I understand your solution. I gathered that you create
>>> a dataset of TCP addresses and then use flatMap to fetch and output the
>>> data?
>>>
>>> If so, then I think it's a good solution for batch processing (DataSet).
>>> It doesn't work in DataStream because it doesn't play well with
>>> checkpointing, but you seem to be interested only in batch. It's also not
>>> the first time, I have seen this pattern being used in batch.
>>>
>>> In general, if it works and is fast enough, it's always a good solution
>>> ;). No need to make it more complicated if you can solve it with simpler
>>> means and you can maintain it more easily.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <ka...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> I have implemented a zmq listener class without extending any class of
>>>> Flink.
>>>> The listener has a constructor with the port number.
>>>>
>>>> Then in the execution I have created a dateset of string which has the
>>>> port numbers.
>>>> Then I used a flattop function, which returned Tuple2<Long, Long>. I
>>>> opened the tcp sockets using localhost, so matching was quite easy.
>>>>
>>>> This seemed to work for me. What do you think about this
>>>> implementation. Do you see any drawback?
>>>>
>>>> Best
>>>> Kaan
>>>>
>>>> On Apr 29, 2020, at 7:40 AM, Arvid Heise <ar...@ververica.com> wrote:
>>>>
>>>> Hi Kaan,
>>>>
>>>> seems like ZMQ is using TCP and not HTTP. So I guess the easiest way
>>>> would be to use a ZMQ Java binding to access it [1].
>>>>
>>>> But of course, it's much more complicated to write an iterator logic
>>>> for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes
>>>> the socket and you can just read as much as possible.
>>>>
>>>> [1] https://zeromq.org/languages/java/
>>>>
>>>> On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> I am sorry for the late response. I had some deadlines, but I am back
>>>>> to work now.
>>>>> I have been trying to implement what we have talked. But I am having
>>>>> problems on the implementation.
>>>>> I have been using ZMQ to open sockets, because that is inheritenly
>>>>> supported in my graph generator. But, I couldn’t make the connection using
>>>>> input streams.
>>>>> Do you have any specific examples, where I can look and have a better
>>>>> idea on how to implement this?
>>>>>
>>>>> Best
>>>>> Kaan
>>>>>
>>>>> On Apr 24, 2020, at 4:37 AM, Arvid Heise <ar...@ververica.com> wrote:
>>>>>
>>>>> Hm, I confused sockets to work the other way around (so pulling like
>>>>> URLInputStream instead of listening). I'd go by providing the data on
>>>>> a port on each generator node. And then read from that in multiple sources.
>>>>>
>>>>> I think the best solution is to implement a custom InputFormat and
>>>>> then use readInput. You could implement a subclass of
>>>>> GenericInputFormat. You might even use IteratorInputFormat like this:
>>>>>
>>>>> private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable {
>>>>>    private final URL url;
>>>>>    private Iterator<Tuple2<Long, Long>> inner;
>>>>>
>>>>>    private URLInputIterator(URL url) {
>>>>>       this.url = url;
>>>>>    }
>>>>>
>>>>>    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
>>>>>       InputStream inputStream = url.openStream();
>>>>>       inner = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
>>>>>          .lines()
>>>>>          .map(line -> {
>>>>>             String[] parts = line.split(";");
>>>>>             return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
>>>>>          })
>>>>>          .iterator();
>>>>>    }
>>>>>
>>>>>    @Override
>>>>>    public boolean hasNext() {
>>>>>       return inner.hasNext();
>>>>>    }
>>>>>
>>>>>    @Override
>>>>>    public Tuple2<Long, Long> next() {
>>>>>       return inner.next();
>>>>>    }
>>>>> }
>>>>>
>>>>> env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), Types.TUPLE(Types.LONG, Types.LONG));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, that sounds like a great idea and actually that's what I am
>>>>>> trying to do.
>>>>>>
>>>>>>  Then you configure your analysis job to read from each of these
>>>>>> sockets with a separate source and union them before feeding them to the
>>>>>> actual job?
>>>>>>
>>>>>>
>>>>>> Before trying to open the sockets on the slave nodes, first I have
>>>>>> opened just one socket at master node, and I also run the generator with
>>>>>> one node as well. I was able to read the graph, and the run my algorithm
>>>>>> without any problems. This was a test run to see whatever I can do it.
>>>>>>
>>>>>> After, I have opened bunch of sockets on my generators, now I am
>>>>>> trying to configure Flink to read from those sockets. However, I am having
>>>>>> problems while trying to assign each task manager to a separate socket. I
>>>>>> am assuming my problems are related to network binding. In my configuration
>>>>>> file,  jobmanager.rpc.address is set but I have not done
>>>>>> similar configurations for slave nodes.
>>>>>>
>>>>>> Am I on the right track, or is there an easier way to handle this?
>>>>>>
>>>>>> I think my point is how to do `read from each of these sockets with
>>>>>> a separate source` part.
>>>>>>
>>>>>> Thanks again
>>>>>>
>>>>>> Best
>>>>>> Kaan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Apr 24, 2020, at 3:11 AM, Arvid Heise <ar...@ververica.com> wrote:
>>>>>>
>>>>>> Hi Kaan,
>>>>>>
>>>>>> sorry, I haven't considered I/O as the bottleneck. I thought a bit
>>>>>> more about your issue and came to a rather simple solution.
>>>>>>
>>>>>> How about you open a socket on each of your generator nodes? Then you
>>>>>> configure your analysis job to read from each of these sockets with a
>>>>>> separate source and union them before feeding them to the actual job?
>>>>>>
>>>>>> You don't need to modify much on the analysis job and each source can
>>>>>> be independently read. WDYT?
>>>>>>
>>>>>> On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the answer! Also thanks for raising some concerns about
>>>>>>> my question.
>>>>>>>
>>>>>>> Some of the graphs I have been using is larger than 1.5 tb, and I am
>>>>>>> currently an experiment stage of a project, and I am making modifications
>>>>>>> to my code and re-runing the experiments again. Currently, on some of the
>>>>>>> largest graphs I have been using, IO became an issue for me and keeps me
>>>>>>> wait for couple of hours.
>>>>>>>
>>>>>>> Moreover, I have a parallel/distributed graph generator, which I can
>>>>>>> run on the same set of nodes in my cluster. So what I wanted to do was, to
>>>>>>> run my Flink program and graph generator at the same time and feed the
>>>>>>> graph through generator, which should be faster than making IO from the
>>>>>>> disk. As you said, it is not essential for me to that, but I am trying to
>>>>>>> see what I am able to do using Flink and how can I solve such problems. I
>>>>>>> was also using another framework, and faced with the similar problem, I was
>>>>>>> able to reduce the graph read time from hours to minutes using this method.
>>>>>>>
>>>>>>>  Do you really have more main memory than disk space?
>>>>>>>
>>>>>>>
>>>>>>> My issue is actually not storage related, I am trying to see how can
>>>>>>> I reduce the IO time.
>>>>>>>
>>>>>>> One trick came to my mind is, creating dummy dataset, and using a
>>>>>>> map function on the dataset, I can open-up bunch of sockets and listen the
>>>>>>> generator, and collect the generated data. I am trying to see how it will
>>>>>>> turn out.
>>>>>>>
>>>>>>> Alternatively, if graph generation is rather cheap, you could also
>>>>>>> try to incorporate it directly into the analysis job.
>>>>>>>
>>>>>>>
>>>>>>> I am not familiar with the analysis jobs. I will look into it.
>>>>>>>
>>>>>>> Again, this is actually not a problem, I am just trying to
>>>>>>> experiment with the framework and see what I can do. I am very new to
>>>>>>> Flink, so my methods might be wrong. Thanks for the help!
>>>>>>>
>>>>>>> Best
>>>>>>> Kaan
>>>>>>>
>>>>>>>
>>>>>>> On Apr 23, 2020, at 10:51 AM, Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Kaan,
>>>>>>>
>>>>>>> afaik there is no (easy) way to switch from streaming back to batch
>>>>>>> API while retaining all data in memory (correct me if I misunderstood).
>>>>>>>
>>>>>>> However, from your description, I also have some severe
>>>>>>> understanding problems. Why can't you dump the data to some file? Do you
>>>>>>> really have more main memory than disk space? Or do you have no shared
>>>>>>> memory between your generating cluster and the flink cluster?
>>>>>>>
>>>>>>> It almost sounds as if the issue at heart is rather to find a good
>>>>>>> serialization format on how to store the edges. The 70 billion edges could
>>>>>>> be stored in an array of id pairs, which amount to ~560 GB uncompressed
>>>>>>> data if stored in Avro (or any other binary serialization format) when ids
>>>>>>> are longs. That's not much by today's standards and could also be easily
>>>>>>> offloaded to S3.
>>>>>>>
>>>>>>> Alternatively, if graph generation is rather cheap, you could also
>>>>>>> try to incorporate it directly into the analysis job.
>>>>>>>
>>>>>>> On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have been running some experiments on  large graph data, smallest
>>>>>>>> graph I have been using is around ~70 billion edges. I have a graph
>>>>>>>> generator, which generates the graph in parallel and feeds to the running
>>>>>>>> system. However, it takes a lot of time to read the edges, because even
>>>>>>>> though the graph generation process is parallel, in Flink I can only listen
>>>>>>>> from master node (correct me if I am wrong). Another option is dumping the
>>>>>>>> generated data to a file and reading with readFromCsv, however this is not
>>>>>>>> feasible in terms of storage management.
>>>>>>>>
>>>>>>>> What I want to do is, invoking my graph generator, using ipc/tcp
>>>>>>>> protocols  and reading the generated data from the sockets. Since the graph
>>>>>>>> data is also generated parallel in each node, I want to make use of ipc,
>>>>>>>> and read the data in parallel at each node. I made some online digging  but
>>>>>>>> couldn’t find something similar using dataset api. I would be glad if you
>>>>>>>> have some similar use cases or examples.
>>>>>>>>
>>>>>>>> Is it possible to use streaming environment to create the data in
>>>>>>>> parallel and switch to dataset api?
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Kaan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>> <https://www.ververica.com/>
>>>>>>>
>>>>>>> Follow us @VervericaData
>>>>>>> --
>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>> Conference
>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>> --
>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>> --
>>>>>>> Ververica GmbH
>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>>> Ji (Toni) Cheng
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Arvid Heise | Senior Java Developer
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>> --
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>> --
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Arvid Heise | Senior Java Developer
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>> --
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>> Stream Processing | Event Driven | Real Time
>>>>> --
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Arvid Heise | Senior Java Developer
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>> --
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>> Stream Processing | Event Driven | Real Time
>>>> --
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng