You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Rajesh Balamohan (JIRA)" <ji...@apache.org> on 2015/06/01 10:05:17 UTC

[jira] [Updated] (TEZ-2505) PipelinedSorter attempts to compare keys from EOF'd streams

     [ https://issues.apache.org/jira/browse/TEZ-2505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rajesh Balamohan updated TEZ-2505:
----------------------------------
    Attachment: TEZ-2505.branch-0.7.0.patch

Cascading's [TupleComparator|https://github.com/cwensel/cascading/blob/30c4b21552ad0db4e3ed3fe8dfed3e702945851a/cascading-hadoop/src/main/shared/cascading/tuple/hadoop/util/TupleComparator.java#L46] internally uses cascading [BufferedInputStream|https://github.com/cwensel/cascading/blob/30c4b21552ad0db4e3ed3fe8dfed3e702945851a/cascading-hadoop/src/main/shared/cascading/tuple/hadoop/util/DeserializerComparator.java#L41].  It clears off buffer in finally block.  PipelinedSorter internally uses single comparator instance and due to its multi-threaded nature, Cascading's TupleComparator stream was getting corrupt. This caused the EOFException as one of the thread would clear off the buffer when other thread is reading it.

Attaching patch for "remotes/origin/branch-0.7" branch.  This was reproducible with simple wordcount cascading (0.3) job on test cluster.  Tested the patch on local vm and on medium scale cluster without issues with PipelinedSorter.

[~gopalv], [~sseth] - Please review  

> PipelinedSorter attempts to compare keys from EOF'd streams
> -----------------------------------------------------------
>
>                 Key: TEZ-2505
>                 URL: https://issues.apache.org/jira/browse/TEZ-2505
>             Project: Apache Tez
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>         Environment: Scalding 0.13.1+PR1220 ; Cascading-3.0.0-wip-118 ; scala 2.11.6 ; java openjdk 1.8.0_45-internal ; Debian linux 8 (stable); Intel(R) Core(TM) i7-3770 (amd64)
>            Reporter: Cyrille Chépélov
>         Attachments: TEZ-2505.branch-0.7.0.patch
>
>
> When attempting to run the same multi-DAG application (that worked fine under the same environment except Cascading-3.0.0-wip-115 and tez 0.6.1), one of the early, and simplest DAG crashes on the PipelinedSorter.
> The stack at the crash site looks like:
> {code}
> 2015-05-28 11:52:47,120 ERROR [TezChild] element.TrapHandler: caught Throwable, no trap available, rethrowing
> cascading.CascadingException: unable to compare stream elements in position: 0
>         at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:164)
>         at cascading.tuple.hadoop.util.TupleComparator.compare(TupleComparator.java:38)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compareKeys(PipelinedSorter.java:669)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compare(PipelinedSorter.java:684)
>         at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:99)
>         at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.sort(PipelinedSorter.java:631)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.sort(PipelinedSorter.java:230)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.collect(PipelinedSorter.java:311)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter.write(PipelinedSorter.java:272)
>         at org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput$1.write(OrderedPartitionedKVOutput.java:164)
>         at cascading.flow.tez.stream.element.OldOutputCollector.collect(OldOutputCollector.java:51)
>         at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
>         at cascading.flow.tez.stream.element.TezCoGroupGate.wrapGroupingAndCollect(TezCoGroupGate.java:193)
>         at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:103)
>         at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:45)
>         at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
>         at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
>         at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
>         at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:49)
>         at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:750)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
>         at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:47)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
>         at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
>         at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
>         at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
>         at com.twitter.scalding.MapFunction.operate(Operations.scala:60)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
>         at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
>         at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
>         at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
>         at cascading.operation.Identity$1.operate(Identity.java:124)
>         at cascading.operation.Identity.operate(Identity.java:150)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:100)
>         at cascading.flow.stream.element.FunctionEachStage.receive(FunctionEachStage.java:40)
>         at cascading.flow.stream.element.SourceStage.map(SourceStage.java:110)
>         at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
>         at cascading.flow.tez.stream.element.TezSourceStage.run(TezSourceStage.java:95)
>         at cascading.flow.tez.FlowProcessor.run(FlowProcessor.java:165)
>         at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:337)
>         at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:179)
>         at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:171)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>         at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:171)
>         at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:167)
>         at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: cascading.CascadingException: unable to read element from underlying stream
>         at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:82)
>         at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:33)
>         at cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:160)
>         ... 55 more
> Caused by: java.io.EOFException
>         at java.io.DataInputStream.readFully(DataInputStream.java:197)
>         at java.io.DataInputStream.readFully(DataInputStream.java:169)
>         at org.apache.hadoop.io.WritableUtils.readString(WritableUtils.java:125)
>         at cascading.tuple.hadoop.io.HadoopTupleInputStream.readString(HadoopTupleInputStream.java:75)
>         at cascading.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:85)
>         at cascading.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
>         at cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:77)
>         ... 57 more
> {code}
> with an apparently random variation at the top of stack which is
> {code}
> 2015-05-28 13:10:13,459 ERROR [TezChild] element.TrapHandler: caught Throwable, no trap available, rethrowing
> cascading.CascadingException: java.io.EOFException
>         at cascading.tuple.hadoop.util.TupleComparator.compare(TupleComparator.java:42)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compareKeys(PipelinedSorter.java:669)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.compare(PipelinedSorter.java:684)
>         at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:99)
>         at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
>         at org.apache.tez.runtime.library.common.sort.impl.PipelinedSorter$SortSpan.sort(PipelinedSorter.java:631)
> {code}
> all running TezChildren fail with the same stacks (either variant) at the same time on the same node, which is straight off a HDFS-backed CSV file.
> The cascading.tuple.hadoop.util.TupleComparator#compare code at the top of stack has been in use in a MAPREDUCE context for over 2.5 years; first analysis with [~cwensel] (who successfully reproduced the issue without scalding) points towards an issue on tez side.
> as a workaround, it is possible to run with {code:scala}"tez.runtime.sorter.class" -> "LEGACY"{code}, but this is impractical in the long run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)