You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Noriyo Koyama <ca...@gmail.com> on 2016/02/18 19:06:40 UTC
IOException Unexpected extra bytes from input stream @ InMemoryMapOutput.java:111
Hello, Hadoop experts!
I need your help on the following problem I am facing. I really
appreciate your help!
I developed java application using map/reduce (reading gz files). It
works fine with test environment on skytap with 2.6.0-cdh5.5.2
However when I deploy it in a different environment which the same
version of 2.6.0-cdh5.5.2 is installed, I got the below error.
The difference would be the size of gz files try to read is bigger
around 10GB - 17GB. As far as I see the InMemoryMapOutput.java:111
where the IOException is thrown, I don't have any clue where I should
check. If you have any suggestion where to look into, java code or
hadoop setting, I really appreciate your help!
—Error start
2016-02-18 05:38:20, WARN, org.apache.hadoop.mapred.LocalJobRunner,
Thread-581, java.lang.Exception:
org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in
shuffle in localfetcher#2
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError:
error in shuffle in localfetcher#2
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected extra bytes from input
stream for attempt_local1963416892_0013_m_000000_0
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.shuffle(InMemoryMapOutput.java:111)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.copyMapOutput(LocalFetcher.java:155)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.doCopy(LocalFetcher.java:102)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.run(LocalFetcher.java:85)
, o.a.h.m.LocalJobRunner$Job, run job_local1963416892_0013
—Error end
The source code where the exception is thrown. The line 111 is the one.
<code>
55 public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
56 MergeManagerImpl<K, V> merger,
57 int size, CompressionCodec codec,
58 boolean primaryMapOutput) {
59 super(mapId, (long)size, primaryMapOutput);
60 this.conf = conf;
61 this.merger = merger;
62 this.codec = codec;
63 byteStream = new BoundedByteArrayOutputStream(size);
64 memory = byteStream.getBuffer();
65 if (codec != null) {
66 decompressor = CodecPool.getDecompressor(codec);
67 } else {
68 decompressor = null;
69 }
70 }
71
72 public byte[] More ...getMemory() {
73 return memory;
74 }
75
76 public BoundedByteArrayOutputStream More ...getArrayStream() {
77 return byteStream;
78 }
79
80 @Override
81 public void More ...shuffle(MapHost host, InputStream input,
82 long compressedLength, long decompressedLength,
83 ShuffleClientMetrics metrics,
84 Reporter reporter) throws IOException {
85 IFileInputStream checksumIn =
86 new IFileInputStream(input, compressedLength, conf);
87
88 input = checksumIn;
89
90 // Are map-outputs compressed?
91 if (codec != null) {
92 decompressor.reset();
93 input = codec.createInputStream(input, decompressor);
94 }
95
96 try {
97 IOUtils.readFully(input, memory, 0, memory.length);
98 metrics.inputBytes(memory.length);
99 reporter.progress();
100 LOG.info("Read " + memory.length + " bytes from map-output for " +
101 getMapId());
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical for
decompression, which is necessary to keep the stream in sync.
109
110 if (input.read() >= 0 ) {
111 throw new IOException("Unexpected extra bytes from input
stream for " +
112 getMapId());
113 }
<code>
Best Regards,
Nori
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@hadoop.apache.org
For additional commands, e-mail: user-help@hadoop.apache.org