You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by CPC <ac...@gmail.com> on 2016/06/07 16:26:05 UTC

DataStream split/select behaviour

Hello everyone,

When i use DataStream split/select,it always send all selected records to
same taskmanager. Is there any reason for this behaviour? Also is it
possible to implement same split/select behaviour for DataSet api(without
 using a different filter for every output )? I found this
https://issues.apache.org/jira/browse/FLINK-87 issue but it is still open...

Thanks...

Re: DataStream split/select behaviour

Posted by Till Rohrmann <tr...@apache.org>.
Hi,

the directed output via the split and select methods are indeed only
available in the DataStream API. Thus, in order to achieve the same with
the DataSet API, you would have to apply multiple filters, as you've
already written.

The result of the select call will only be sent to the same task manager,
if all subtask of the downstream operator are executed on this TaskManager.
Be aware that subtasks of different operators will share the same slot if
you haven't assigned a different slot sharing group to the operator. So for
example, if you have 2 TaskManagers with 2 slots each then you have 4 slots
in total. If you have now two select operators with a parallelism of 2
each, then these two operators could be executed on the same TaskManager.
For more information about the slot sharing model, see the following link:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#workers-slots-resources

Cheers,
Till

On Tue, Jun 7, 2016 at 7:15 PM, CPC <ac...@gmail.com> wrote:

> Sorry i think i misunderstand the issue. But it seams DataStream partition
> the data by some field and when i select that  field only one taskmanager
> processing the data. I can achieve same result when i use filter.Below is
> the code piece:
>
>
> import org.apache.flink.api.java.io.TextInputFormat
> import org.apache.flink.core.fs.Path
> import org.apache.flink.streaming.api.scala._
>
> case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
> String, curTitle: String, ttype: String)
>
> object StreamingSelect {
>
>
>   def main(args: Array[String]): Unit = {
>     val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>     //val rootPath = "gs://cpcflink/wikistream/"
>     val stream: DataStream[String] = env.createInput(new
> TextInputFormat(new
> Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))
>
>     val wikiStream = stream.map {
>       line =>
>         val values = line.split("\t")
>         WikiData(
>           if (values(0).isEmpty) None else
> Some(Integer.parseInt(values(0))),
>           Integer.parseInt(values(1)),
>           Integer.parseInt(values(2)),
>           values(3),
>           values(4),
>           if (values.length < 6) null else values(5)
>         )
>     }
>
>     val split = wikiStream
>       .split(i => if (i.curID == 14533) List("14533") else List.empty)
>     val stream14533 = split.select("14533").map(i => (i.curID, i.num))
>
>     stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")
>
>     env.execute()
>   }
> }
>
> On 7 June 2016 at 19:26, CPC <ac...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > When i use DataStream split/select,it always send all selected records to
> > same taskmanager. Is there any reason for this behaviour? Also is it
> > possible to implement same split/select behaviour for DataSet api(without
> >  using a different filter for every output )? I found this
> > https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> > open...
> >
> > Thanks...
> >
>

Re: DataStream split/select behaviour

Posted by CPC <ac...@gmail.com>.
Sorry i think i misunderstand the issue. But it seams DataStream partition
the data by some field and when i select that  field only one taskmanager
processing the data. I can achieve same result when i use filter.Below is
the code piece:


import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala._

case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
String, curTitle: String, ttype: String)

object StreamingSelect {


  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    //val rootPath = "gs://cpcflink/wikistream/"
    val stream: DataStream[String] = env.createInput(new
TextInputFormat(new
Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))

    val wikiStream = stream.map {
      line =>
        val values = line.split("\t")
        WikiData(
          if (values(0).isEmpty) None else
Some(Integer.parseInt(values(0))),
          Integer.parseInt(values(1)),
          Integer.parseInt(values(2)),
          values(3),
          values(4),
          if (values.length < 6) null else values(5)
        )
    }

    val split = wikiStream
      .split(i => if (i.curID == 14533) List("14533") else List.empty)
    val stream14533 = split.select("14533").map(i => (i.curID, i.num))

    stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")

    env.execute()
  }
}

On 7 June 2016 at 19:26, CPC <ac...@gmail.com> wrote:

> Hello everyone,
>
> When i use DataStream split/select,it always send all selected records to
> same taskmanager. Is there any reason for this behaviour? Also is it
> possible to implement same split/select behaviour for DataSet api(without
>  using a different filter for every output )? I found this
> https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> open...
>
> Thanks...
>