You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tez.apache.org by Hitesh Sharma <hi...@microsoft.com.INVALID> on 2017/09/19 23:24:21 UTC

Custom routing in EdgeManager

Hello,


I'm looking to add a custom edge manager which allows me to route events between two vertices using some custom protocol. For instance I want to say that DataMovementEvent(s) from the tasks in the source vertex should be routed to the tasks in the destination vertex based on the fact whether the tasks are in the same rack or not (or for that matter use some other key to route events between the tasks in the two stages). To do this I implemented my own EdgeManagerPluginOnDemand derivative but I see it has two APIs for routing the events:


routeDataMovementEventToDestination<https://tez.apache.org/releases/0.8.2/tez-api-javadocs/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.html#routeDataMovementEventToDestination(int,%20int,%20int)>(int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)


routeDataMovementEventToDestination<https://tez.apache.org/releases/0.8.2/tez-api-javadocs/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.html#routeDataMovementEventToDestination(org.apache.tez.runtime.api.events.DataMovementEvent,%20int,%20int,%20java.util.Map)>(DataMovementEvent<https://tez.apache.org/releases/0.8.2/tez-api-javadocs/org/apache/tez/runtime/api/events/DataMovementEvent.html> event, int sourceTaskIndex, int sourceOutputIndex, Map<http://docs.oracle.com/javase/7/docs/api/java/util/Map.html?is-external=true><Integer<http://docs.oracle.com/javase/7/docs/api/java/lang/Integer.html?is-external=true>,List<http://docs.oracle.com/javase/7/docs/api/java/util/List.html?is-external=true><Integer<http://docs.oracle.com/javase/7/docs/api/java/lang/Integer.html?is-external=true>>> destinationTaskAndInputIndices)

My questions is:


  *   What's the difference between the two APIs and which one is to be used? The API with DataMovementEvent doesn't seem to be getting called with ScatterGather edge manager and others.
  *   If this API is deprecated then it is not sufficient in my case to do the routing as I need some more metadata, which I could have got from the DataMovementEvent payload for e.g., so what options do I have here?


Thanks,

Hitesh

Re: Custom routing in EdgeManager

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> I have implemented/overriden routeCompositeDataMovementEventToDestination but it isn't getting called. I'm raising DataMovementEvents though (and not composite ones), so it might be expected?

I'm not sure if you're raising compose DMEs or not, so I'm speaking purely from the point of view of Hive + a simple shuffle edge with auto-reducer parallelism.

> - Difference between the overloads of routeDataMovementEventDestination (is any of them depreciated?)

They're all called from different codepaths, so they are all used, but probably the additions haven't gone back and changed existing calls.

You should probably breakpoint in maybeAddTezEventForDestinationTask() and see where it goes.

There are people on this list who these APIs more often than I do - perhaps a bit of API housekeeping might help make it more consistent, without having to break too many bits (since the OnDemand is abstract and not an interface, it should be able to change its internals to chain the APIs together, calling them in hierarchy to handle all override cases together).

> - Difference between EdgeManagerPlugin and EdgeManagerPluginOnDemand

https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java#L271

> - Are there any scale advantages of using CompositeDataMovementEvent vs DataMovementEvent? My naive understanding says that the former is more of a convenience thing and from a scale point of view there maybe no difference.

That really depends on whether you are using Composite events as-is - the getEvents() is an Iterable, so there is a definite scale advantage in sending composite events over the wire instead of sending 1000 copies of the same payload.

See the code in

https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java#L433
+
https://github.com/apache/tez/blob/master/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java#L111

Cheers,
Gopal    
    





Re: Custom routing in EdgeManager

Posted by Hitesh Sharma <hi...@microsoft.com.INVALID>.
Thanks Gopal. Yes, rack/pod aggregates is something we are thinking about. I will go through the discussion on that JIRA to understand things a little better.

I have implemented/overriden routeCompositeDataMovementEventToDestination but it isn't getting called. I'm raising DataMovementEvents though (and not composite ones), so it might be expected?

If I set breakpoints in all the flavors of routeDataMovementEvent* in the ScatterGatherEdgeManager then I only see routeDataMovementEventToDestination(int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex) getting called. This makes me wonder the following:

- Difference between the overloads of routeDataMovementEventDestination (is any of them depreciated?)
- Difference between EdgeManagerPlugin and EdgeManagerPluginOnDemand
- Are there any scale advantages of using CompositeDataMovementEvent vs DataMovementEvent? My naive understanding says that the former is more of a convenience thing and from a scale point of view there maybe no difference.

Thanks,
Hitesh



________________________________
From: Gopal Vijayaraghavan <go...@apache.org>
Sent: Tuesday, September 19, 2017 7:59 PM
To: dev@tez.apache.org
Subject: Re: Custom routing in EdgeManager


> For instance I want to say that DataMovementEvent(s) from the tasks in the source vertex should be routed to the tasks in the destination  vertex based on the fact whether the tasks are in the same rack or not (or for that matter use some other key to route events between the tasks in the two stages).

There was an attempt at scheduling a combiner task in rack-local to speed up dedup ops (by doing per-rack aggregates) - https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FTEZ-145&data=02%7C01%7Chitesh%40microsoft.com%7C658cb716eee3477e310c08d4ffd39634%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414731598994685&sdata=xtkU74%2Br5GoajdL2TK%2B1Q0LVbZA2UM5M7Z1xvWA48fs%3D&reserved=0

I'm wondering if you're trying to do something similar.

> To do this I implemented my own EdgeManagerPluginOnDemand derivative but I see it has two APIs  for routing the events:

I think you might not have overridden routeCompositeDataMovementEventToDestination().

If you want to submit a patch with additional log lines to the Tez Edge.java, I think that might be one place which is under-logged for these cases.

Cheers,
Gopal





Re: Custom routing in EdgeManager

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> For instance I want to say that DataMovementEvent(s) from the tasks in the source vertex should be routed to the tasks in the destination  vertex based on the fact whether the tasks are in the same rack or not (or for that matter use some other key to route events between the tasks in the two stages). 

There was an attempt at scheduling a combiner task in rack-local to speed up dedup ops (by doing per-rack aggregates) - https://issues.apache.org/jira/browse/TEZ-145

I'm wondering if you're trying to do something similar.

> To do this I implemented my own EdgeManagerPluginOnDemand derivative but I see it has two APIs  for routing the events:

I think you might not have overridden routeCompositeDataMovementEventToDestination().

If you want to submit a patch with additional log lines to the Tez Edge.java, I think that might be one place which is under-logged for these cases.

Cheers,
Gopal
    
    



Re: Custom routing in EdgeManager

Posted by Hitesh Sharma <hi...@microsoft.com.INVALID>.
Fixing the formatting and resending..

I'm looking to add a custom edge manager which allows me to route events between two vertices using some custom protocol. For instance I want to say that DataMovementEvent(s) from the tasks in the source vertex should be routed to the tasks in the destination  vertex based on the fact whether the tasks are in the same rack or not (or for that matter use some other key to route events between the tasks in the two stages). To do this I implemented my own EdgeManagerPluginOnDemand derivative but I see it has two APIs  for routing the events:

routeDataMovementEventToDestination(int sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)
routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer,List<Integer>> destinationTaskAndInputIndices)

My questions are:

- What's the difference between the two APIs and which one is to be used? The API with DataMovementEvent doesn't seem to be getting called with ScatterGather edge manager and others.
- If this API is deprecated then it is not sufficient in my case to do the routing as I need some more metadata, which I could have got from the DataMovementEvent payload for e.g., so what options do I have here?

Thanks,
Hitesh


From: Hitesh Sharma <hi...@microsoft.com.INVALID>
Sent: Tuesday, September 19, 2017 4:24 PM
To: dev@tez.apache.org
Subject: Custom routing in EdgeManager
    
Hello,


I'm looking to add a custom edge manager which allows me to route events between two vertices using some custom protocol. For instance I want to say that DataMovementEvent(s) from the tasks in the source vertex should be routed to the tasks in the destination  vertex based on the fact whether the tasks are in the same rack or not (or for that matter use some other key to route events between the tasks in the two stages). To do this I implemented my own EdgeManagerPluginOnDemand derivative but I see it has two APIs  for routing the events:


routeDataMovementEventToDestination<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftez.apache.org%2Freleases%2F0.8.2%2Ftez-api-javadocs%2Forg%2Fapache%2Ftez%2Fdag%2Fapi%2FEdgeManagerPluginOnDemand.html%23routeDataMovementEventToDestination(int%2C%2520int%2C%2520int&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=sjKKq5DYkoX%2Bp5kdtW%2BPIdaXJQFd1Kk0nOn6iDN%2FDBI%3D&reserved=0)>(int  sourceTaskIndex, int sourceOutputIndex, int destinationTaskIndex)


routeDataMovementEventToDestination<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftez.apache.org%2Freleases%2F0.8.2%2Ftez-api-javadocs%2Forg%2Fapache%2Ftez%2Fdag%2Fapi%2FEdgeManagerPluginOnDemand.html%23routeDataMovementEventToDestination(org.apache.tez.runtime.api.events.DataMovementEvent%2C%2520int%2C%2520int%2C%2520java.util.Map&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=ikq%2Bv1Z18Xg%2FGWmuU0luja6k%2Bil2Xu0XvzAbObp6Zkg%3D&reserved=0)>(DataMovementEvent<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftez.apache.org%2Freleases%2F0.8.2%2Ftez-api-javadocs%2Forg%2Fapache%2Ftez%2Fruntime%2Fapi%2Fevents%2FDataMovementEvent.html&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=0SYlziVPD4mAeNraRJrUvzHeKLloMMTNcYGjiVA1KK0%3D&reserved=0>  event, int sourceTaskIndex, int sourceOutputIndex, Map<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fdocs.oracle.com%2Fjavase%2F7%2Fdocs%2Fapi%2Fjava%2Futil%2FMap.html%3Fis-external%3Dtrue&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=U4spiJ%2FqEg4vrn2I3Ptr%2FJQ2OS7lWfD7pbwQCOEsJPE%3D&reserved=0><Integer<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fdocs.oracle.com%2Fjavase%2F7%2Fdocs%2Fapi%2Fjava%2Flang%2FInteger.html%3Fis-external%3Dtrue&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=vF8uWk6g20ss7%2BeOAoTerkk25FmM4P3WDRFT9byn4MY%3D&reserved=0>,List<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fdocs.oracle.com%2Fjavase%2F7%2Fdocs%2Fapi%2Fjava%2Futil%2FList.html%3Fis-external%3Dtrue&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=oXu0hEmF81ZIx5M6Tpqv7KJ%2FwGT%2FYGf%2BsqBeOhAGIxg%3D&reserved=0><Integer<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fdocs.oracle.com%2Fjavase%2F7%2Fdocs%2Fapi%2Fjava%2Flang%2FInteger.html%3Fis-external%3Dtrue&data=02%7C01%7Chitesh%40microsoft.com%7C5e63bce773a245622e2608d4ffb595eb%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636414602750314036&sdata=vF8uWk6g20ss7%2BeOAoTerkk25FmM4P3WDRFT9byn4MY%3D&reserved=0>>>  destinationTaskAndInputIndices)

My questions is:


  *   What's the difference between the two APIs and which one is to be used? The API with DataMovementEvent doesn't seem to be getting called with ScatterGather edge manager and others.
  *   If this API is deprecated then it is not sufficient in my case to do the routing as I need some more metadata, which I could have got from the DataMovementEvent payload for e.g., so what options do I have here?


Thanks,

Hitesh