You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Piyush Narang (JIRA)" <ji...@apache.org> on 2016/08/05 01:37:20 UTC

[jira] [Comment Edited] (TEZ-3369) Fixing Tez's DAGClients to work with Cascading

    [ https://issues.apache.org/jira/browse/TEZ-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408750#comment-15408750 ] 

Piyush Narang edited comment on TEZ-3369 at 8/5/16 1:36 AM:
------------------------------------------------------------

Ok, spent some time checking out the Tez code, I'm thinking of the following APIs. [~hitesh], would be great to get your thoughts.
1) getDAGInformation(dagID) -> returns a DAGInformation struct which has details like dag name, id, list of vertices (just names, ids)
2) getTaskInformation(dagId, vertexId, taskId) -> returns a TaskInfo object (has taskId, state, scheduled time, start time, endTime, counters etc)
3) getTaskInformationList(dagId, vertexId, startTaskId, limit) -> returns a List<TaskInfo>. This provides a paginated view of the tasks for a vertex. 

Had a follow up question based on looking at the rpc / http clients. Seems like the RPC implementation only supports vending out DAG details for the current DAG (whereas ATS seems to provide info for tasks belonging to others as well?). Is there any way we can retrieve this in the RPC server too? Would it be ok to keep cached copies of the DAG objects that are not current in the DAGAppMaster? 


was (Author: pnarang):
Ok, spent some time checking out the Tez code, here I'm thinking of the following APIs. [~hitesh], would be great to get your thoughts.
1) getDAGInformation(dagID) -> returns a DAGInformation struct which has details like dag name, id, list of vertices (just names, ids)
2) getTaskInformation(dagId, vertexId, taskId) -> returns a TaskInfo object (has taskId, state, scheduled time, start time, endTime, counters etc)
3) getTaskInformationList(dagId, vertexId, startTaskId, limit) -> returns a List<TaskInfo>. This provides a paginated view of the tasks for a vertex. 

Had a follow up question based on looking at the rpc / http clients. Seems like the RPC implementation only supports vending out DAG details for the current DAG (whereas ATS seems to provide info for tasks belonging to others as well?). Is there any way we can retrieve this in the RPC server too? Would it be ok to keep cached copies of the DAG objects that are not current in the DAGAppMaster? 

> Fixing Tez's DAGClients to work with Cascading
> ----------------------------------------------
>
>                 Key: TEZ-3369
>                 URL: https://issues.apache.org/jira/browse/TEZ-3369
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Piyush Narang
>
> Hi,
> We seem to be running into issues when we try to use the newest version of Tez (0.9.0-SNAPSHOT) with Cascading. The issue seems to be:
> {code}
> java.lang.ClassCastException: cascading.stats.tez.util.TezTimelineClient cannot be cast to org.apache.tez.dag.api.client.DAGClient
> 	at cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
> {code}
> (Full stack trace at the end)
> Relevant Cascading code is:
> 1) [Cascading tries to create a TezTimelineClient and cast it to a DAGClient | https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezStatsUtil.java#L142]
> 2) [TezTimelineClient extends from DAGClientTimelineImpl | https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezTimelineClient.java#L53]
> 3) [DAGClientTimelineImpl extends from DAGClientInternal | https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java#L68]
> 4) [DAGClientInternal extends Closeable which is why things break | https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java#L38].
> This behavior was 'broken' in this [commit | https://github.com/apache/tez/commit/2af886b509015200e1c04527275474cbc771c667] (release 0.8.3)
> The TezTimelineClient in Cascading seems to do two things:
> 1) DAGClient functionalities - ends up delegating to the inner DAGClient object.
> 2) Retrieve stuff like vertexID, vertexChildren and vertexChild (from this [interface|https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TimelineClient.java#L31]). 
> As there's no good way to get the vertexID / vertexChildren / vertexChild (correct me if I'm wrong), they end up extending from the DAGClientTimelineImpl which has the http client and json parsing code to allow [things like this | https://github.com/Cascading/cascading/blob/3.1/cascading-hadoop2-tez-stats/src/main/java/cascading/stats/tez/util/TezTimelineClient.java#L93]:
> {code}
> @Override
>   public String getVertexID( String vertexName ) throws IOException, TezException
>     {
>     // the filter 'vertexName' is in the 'otherinfo' field, so it must be requested, otherwise timeline server throws
>     // an NPE. to be safe, we include both fields in the result
>     String format = "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s";
>     String url = String.format( format, baseUri, TEZ_VERTEX_ID, TEZ_DAG_ID, dagId, vertexName, FILTER_BY_FIELDS );
>     JSONObject jsonRoot = getJsonRootEntity( url );
>     JSONArray entitiesNode = jsonRoot.optJSONArray( ENTITIES );
> ...
> {code}
> Some options I can think of:
> 1) Ideally these methods getVertexID / getVertexChildren / getVertexChild would be part of DAGClient? Or even part of the DAGClientTimelineImpl? That way the cascading code wouldn't need updating if the uri changed / json format changed, it would end up being updated in these clients as well. I suspect adding this to DAGClient would require more work as it'll also need to be supported by the RPCClient and I don't think there are the relevant protos and such available. 
> 2) A simpler fix would be to have DAGClientInternal extend DAGClient (currently it just implements Closeable). This will not require any changes on the Cascading side as DAGClientTimelineImpl will continue to be a DAGClient. 
> Full stack trace:
> {code}
> Exception in thread "flow com.twitter.data_platform.e2e_testing.jobs.parquet.E2ETestConvertThriftToParquet" java.lang.ClassCastException: cascading.stats.tez.util.TezTimelineClient cannot be cast to org.apache.tez.dag.api.client.DAGClient
> 	at cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
> 	at cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:117)
> 	at cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:105)
> 	at cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:60)
> 	at cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:56)
> 	at cascading.stats.CounterCache.cachedCounters(CounterCache.java:229)
> 	at cascading.stats.CounterCache.cachedCounters(CounterCache.java:187)
> 	at cascading.stats.CounterCache.getCounterValue(CounterCache.java:167)
> 	at cascading.stats.BaseCachedStepStats.getCounterValue(BaseCachedStepStats.java:105)
> 	at cascading.stats.FlowStats.getCounterValue(FlowStats.java:170)
> 	at cascading.flow.tez.Hadoop2TezFlow.getTotalSliceCPUMilliSeconds(Hadoop2TezFlow.java:303)
> 	at cascading.flow.BaseFlow.run(BaseFlow.java:1287)
> 	at cascading.flow.BaseFlow.access$100(BaseFlow.java:82)
> 	at cascading.flow.BaseFlow$1.run(BaseFlow.java:928)
> 	at java.lang.Thread.run(Thread.java:745)
> Exception in thread "main" java.lang.Throwable: If you know what exactly caused this error, please consider contributing to GitHub via following link.
> https://github.com/twitter/scalding/wiki/Common-Exceptions-and-possible-reasons#javalangclasscastexception
> 	at com.twitter.scalding.Tool$.main(Tool.scala:152)
> 	at com.twitter.scalding.Tool.main(Tool.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
> 	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
> Caused by: java.lang.ClassCastException: cascading.stats.tez.util.TezTimelineClient cannot be cast to org.apache.tez.dag.api.client.DAGClient
> 	at cascading.stats.tez.util.TezStatsUtil.createTimelineClient(TezStatsUtil.java:142)
> 	at cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:117)
> 	at cascading.flow.tez.planner.Hadoop2TezFlowStepJob$1.getJobStatusClient(Hadoop2TezFlowStepJob.java:105)
> 	at cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:60)
> 	at cascading.stats.tez.TezStepStats$1.getJobStatusClient(TezStepStats.java:56)
> 	at cascading.stats.CounterCache.cachedCounters(CounterCache.java:229)
> 	at cascading.stats.CounterCache.cachedCounters(CounterCache.java:187)
> 	at cascading.stats.CounterCache.getCountersFor(CounterCache.java:155)
> 	at cascading.stats.BaseCachedStepStats.getCountersFor(BaseCachedStepStats.java:93)
> 	at cascading.stats.FlowStats.getCountersFor(FlowStats.java:159)
> 	at com.twitter.scalding.Stats$.getAllCustomCounters(Stats.scala:93)
> 	at com.twitter.scalding.Job.handleStats(Job.scala:269)
> 	at com.twitter.scalding.Job.run(Job.scala:298)
> 	at com.twitter.scalding.Tool.start$1(Tool.scala:124)
> 	at com.twitter.scalding.Tool.run(Tool.scala:140)
> 	at com.twitter.scalding.Tool.run(Tool.scala:68)
> 	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> 	at com.twitter.scalding.Tool$.main(Tool.scala:148)
> 	... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)