You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Noriyo Koyama <ca...@gmail.com> on 2016/02/18 19:06:40 UTC

IOException Unexpected extra bytes from input stream @ InMemoryMapOutput.java:111

Hello, Hadoop experts!

 I need your help on the following problem I am facing. I really
appreciate your help!
I developed java application using map/reduce (reading gz files). It
works fine with test environment on skytap with 2.6.0-cdh5.5.2
However when I deploy it in a different environment which the same
version of 2.6.0-cdh5.5.2 is installed, I got the below error.
The difference would be the size of gz files try to read is bigger
around 10GB - 17GB. As far as I see the InMemoryMapOutput.java:111
where the IOException is thrown, I don't have any clue where I should
check. If you have any suggestion where to look into, java code or
hadoop setting, I really appreciate your help!

—Error start
2016-02-18 05:38:20, WARN, org.apache.hadoop.mapred.LocalJobRunner,
Thread-581, java.lang.Exception:
org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in
shuffle in localfetcher#2
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError:
error in shuffle in localfetcher#2
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unexpected extra bytes from input
stream for attempt_local1963416892_0013_m_000000_0
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.shuffle(InMemoryMapOutput.java:111)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.copyMapOutput(LocalFetcher.java:155)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.doCopy(LocalFetcher.java:102)
at org.apache.hadoop.mapreduce.task.reduce.LocalFetcher.run(LocalFetcher.java:85)
, o.a.h.m.LocalJobRunner$Job, run job_local1963416892_0013
—Error end

The source code where the exception is thrown. The line 111 is the one.
<code>
55   public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
56                            MergeManagerImpl<K, V> merger,
57                            int size, CompressionCodec codec,
58                            boolean primaryMapOutput) {
59     super(mapId, (long)size, primaryMapOutput);
60     this.conf = conf;
61     this.merger = merger;
62     this.codec = codec;
63     byteStream = new BoundedByteArrayOutputStream(size);
64     memory = byteStream.getBuffer();
65     if (codec != null) {
66       decompressor = CodecPool.getDecompressor(codec);
67     } else {
68       decompressor = null;
69     }
70   }
71
72   public byte[] More ...getMemory() {
73     return memory;
74   }
75
76   public BoundedByteArrayOutputStream More ...getArrayStream() {
77     return byteStream;
78   }
79
80   @Override
81   public void More ...shuffle(MapHost host, InputStream input,
82                       long compressedLength, long decompressedLength,
83                       ShuffleClientMetrics metrics,
84                       Reporter reporter) throws IOException {
85     IFileInputStream checksumIn =
86       new IFileInputStream(input, compressedLength, conf);
87
88     input = checksumIn;
89
90     // Are map-outputs compressed?
91     if (codec != null) {
92       decompressor.reset();
93       input = codec.createInputStream(input, decompressor);
94     }
95
96     try {
97       IOUtils.readFully(input, memory, 0, memory.length);
98       metrics.inputBytes(memory.length);
99       reporter.progress();
100      LOG.info("Read " + memory.length + " bytes from map-output for " +
101                getMapId());
We've gotten the amount of data we were expecting. Verify the
decompressor has nothing more to offer. This action also forces the
decompressor to read any trailing bytes that weren't critical for
decompression, which is necessary to keep the stream in sync.
109
110      if (input.read() >= 0 ) {
111        throw new IOException("Unexpected extra bytes from input
stream for " +
112                               getMapId());
113      }
<code>

Best Regards,
 Nori

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@hadoop.apache.org
For additional commands, e-mail: user-help@hadoop.apache.org