You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by "Wright, Eric" <ew...@SoleoCommunications.com> on 2014/04/28 17:39:30 UTC

Map Reduce Error

Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development  (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it's reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text>

LongWriteable - byte offset from beginning of file
Text - Line of text currently being processed
Text - Line of text currently being processed
Text - Output File Indication

And the reducer has the following generic parameters <Text, Text, NullWriteable, Text>
Text - Line of text currently being processed
Text - Output File Indication
NullWriteable - Empty key
Text - Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10's of GB's of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I'd be happy to provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion git@github.com:hortonworks/hadoop.git<mailto:git@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
>From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


>From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,349 WARN  datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
        at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /10.1.21.12:50010
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311 dest: /10.1.21.12:50010



>From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
...
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973 remote=/10.1.21.10:50010]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/10.1.21.12:50010]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010 are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010 are bad. Aborting...
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

...

2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
...

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)


RE: Map Reduce Error

Posted by "Wright, Eric" <ew...@SoleoCommunications.com>.
Just an update. For those that are curious, it turns out that we polluting the HDFS layer with thousands of small files based on a mis-understanding of what was going on with our data pipelines. Thank you Divye, as you comment to look for leaking file descriptors lead me down the path that identified this. We have cleaned up this issue and as of to date, it has been running well.

Eric Wright
Software Engineer
S O L E O
Telephone: +1-585-641-4300 x0069
Web:  www.soleo.com<http://www.soleo.com/>


From: divye sheth [mailto:divs.sheth@gmail.com]
Sent: Tuesday, April 29, 2014 7:29 AM
To: user@hadoop.apache.org
Subject: Re: Map Reduce Error

Hi Eric,

IMHO you do have a solution by increasing the xcievers count in hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open file descriptors.

Thanks
Divye Sheth

On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <ew...@soleocommunications.com>> wrote:
Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development  (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it’s reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text>

LongWriteable – byte offset from beginning of file
Text – Line of text currently being processed
Text – Line of text currently being processed
Text – Output File Indication

And the reducer has the following generic parameters <Text, Text, NullWriteable, Text>
Text – Line of text currently being processed
Text – Output File Indication
NullWriteable – Empty key
Text – Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10’s of GB’s of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I’d be happy to provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion git@github.com:hortonworks/hadoop.git<mailto:git@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010<http://bi-data-12.soleocommunications.com:50010>', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010<http://10.1.21.10:50010>: java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,349 WARN  datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
        at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation  src: /10.1.21.12:37307<http://10.1.21.12:37307> dest: /10.1.21.12:50010<http://10.1.21.12:50010>
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311<http://10.1.21.12:37311> dest: /10.1.21.12:50010<http://10.1.21.12:50010>



From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
…
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973<http://10.1.21.10:43973> remote=/10.1.21.10:50010<http://10.1.21.10:50010>]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010<http://10.1.21.10:50010>
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537<http://10.1.21.10:49537> remote=/10.1.21.12:50010<http://10.1.21.12:50010>]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

…

2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
…

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)



RE: Map Reduce Error

Posted by "Wright, Eric" <ew...@SoleoCommunications.com>.
Just an update. For those that are curious, it turns out that we polluting the HDFS layer with thousands of small files based on a mis-understanding of what was going on with our data pipelines. Thank you Divye, as you comment to look for leaking file descriptors lead me down the path that identified this. We have cleaned up this issue and as of to date, it has been running well.

Eric Wright
Software Engineer
S O L E O
Telephone: +1-585-641-4300 x0069
Web:  www.soleo.com<http://www.soleo.com/>


From: divye sheth [mailto:divs.sheth@gmail.com]
Sent: Tuesday, April 29, 2014 7:29 AM
To: user@hadoop.apache.org
Subject: Re: Map Reduce Error

Hi Eric,

IMHO you do have a solution by increasing the xcievers count in hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open file descriptors.

Thanks
Divye Sheth

On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <ew...@soleocommunications.com>> wrote:
Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development  (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it’s reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text>

LongWriteable – byte offset from beginning of file
Text – Line of text currently being processed
Text – Line of text currently being processed
Text – Output File Indication

And the reducer has the following generic parameters <Text, Text, NullWriteable, Text>
Text – Line of text currently being processed
Text – Output File Indication
NullWriteable – Empty key
Text – Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10’s of GB’s of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I’d be happy to provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion git@github.com:hortonworks/hadoop.git<mailto:git@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010<http://bi-data-12.soleocommunications.com:50010>', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010<http://10.1.21.10:50010>: java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,349 WARN  datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
        at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation  src: /10.1.21.12:37307<http://10.1.21.12:37307> dest: /10.1.21.12:50010<http://10.1.21.12:50010>
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311<http://10.1.21.12:37311> dest: /10.1.21.12:50010<http://10.1.21.12:50010>



From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
…
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973<http://10.1.21.10:43973> remote=/10.1.21.10:50010<http://10.1.21.10:50010>]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010<http://10.1.21.10:50010>
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537<http://10.1.21.10:49537> remote=/10.1.21.12:50010<http://10.1.21.12:50010>]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

…

2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
…

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)



RE: Map Reduce Error

Posted by "Wright, Eric" <ew...@SoleoCommunications.com>.
Just an update. For those that are curious, it turns out that we polluting the HDFS layer with thousands of small files based on a mis-understanding of what was going on with our data pipelines. Thank you Divye, as you comment to look for leaking file descriptors lead me down the path that identified this. We have cleaned up this issue and as of to date, it has been running well.

Eric Wright
Software Engineer
S O L E O
Telephone: +1-585-641-4300 x0069
Web:  www.soleo.com<http://www.soleo.com/>


From: divye sheth [mailto:divs.sheth@gmail.com]
Sent: Tuesday, April 29, 2014 7:29 AM
To: user@hadoop.apache.org
Subject: Re: Map Reduce Error

Hi Eric,

IMHO you do have a solution by increasing the xcievers count in hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open file descriptors.

Thanks
Divye Sheth

On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <ew...@soleocommunications.com>> wrote:
Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development  (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it’s reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text>

LongWriteable – byte offset from beginning of file
Text – Line of text currently being processed
Text – Line of text currently being processed
Text – Output File Indication

And the reducer has the following generic parameters <Text, Text, NullWriteable, Text>
Text – Line of text currently being processed
Text – Output File Indication
NullWriteable – Empty key
Text – Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10’s of GB’s of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I’d be happy to provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion git@github.com:hortonworks/hadoop.git<mailto:git@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010<http://bi-data-12.soleocommunications.com:50010>', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010<http://10.1.21.10:50010>: java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,349 WARN  datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
        at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation  src: /10.1.21.12:37307<http://10.1.21.12:37307> dest: /10.1.21.12:50010<http://10.1.21.12:50010>
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311<http://10.1.21.12:37311> dest: /10.1.21.12:50010<http://10.1.21.12:50010>



From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
…
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973<http://10.1.21.10:43973> remote=/10.1.21.10:50010<http://10.1.21.10:50010>]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010<http://10.1.21.10:50010>
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537<http://10.1.21.10:49537> remote=/10.1.21.12:50010<http://10.1.21.12:50010>]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

…

2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
…

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)



RE: Map Reduce Error

Posted by "Wright, Eric" <ew...@SoleoCommunications.com>.
Just an update. For those that are curious, it turns out that we polluting the HDFS layer with thousands of small files based on a mis-understanding of what was going on with our data pipelines. Thank you Divye, as you comment to look for leaking file descriptors lead me down the path that identified this. We have cleaned up this issue and as of to date, it has been running well.

Eric Wright
Software Engineer
S O L E O
Telephone: +1-585-641-4300 x0069
Web:  www.soleo.com<http://www.soleo.com/>


From: divye sheth [mailto:divs.sheth@gmail.com]
Sent: Tuesday, April 29, 2014 7:29 AM
To: user@hadoop.apache.org
Subject: Re: Map Reduce Error

Hi Eric,

IMHO you do have a solution by increasing the xcievers count in hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open file descriptors.

Thanks
Divye Sheth

On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <ew...@soleocommunications.com>> wrote:
Currently, we are working on setting up a production hadoop cluster. We have a smaller cluster of four nodes we are working on for development  (nn, snn, and two datanodes). From a high level, we flume our data into HDFS, and then due to the way that flume may resend data on error (due to the way it handles it’s reliability guarantees), it is possible to get duplicate records. In our case, our batch size is 100 so we occasionally see 100 records duplicated. In order to remedy this, we de-duplicate the data using a map reduce job.

For this job, the mapper has the following generic parameters <LongWriteable, Text, Text, Text>

LongWriteable – byte offset from beginning of file
Text – Line of text currently being processed
Text – Line of text currently being processed
Text – Output File Indication

And the reducer has the following generic parameters <Text, Text, NullWriteable, Text>
Text – Line of text currently being processed
Text – Output File Indication
NullWriteable – Empty key
Text – Line of text currently being processed

We read the standard LongWriteable, Text as provided by a TextFileInputFormat input key, value types coming into the mapper. We emit from the mapper the value as the key (ie. the full text string it passed in as the value to the mapper). This would essentially mean that the reducer will receive as input Text, Text. The reducer key is the string of text, and the value is a destination file. The reducer uses the value it received from the mapper to determine an output file as it uses MultipleOutputs.

So, our map phase reads the strings, pushes the lines out as the keys so that the sort and partition phase send any duplicate lines to a single reducer, and then the reducer writes no more than one key (line) to its output file.

Again, this does work at smaller scales, but when we increase our payload, we start getting numerous IO errors. It fails when we start processing 10’s of GB’s of data. I have tried numerous different things and believe that either our configuration is an issue or we are doing something fundamentally wrong.

Any advice or things to check would be greatly appreciated. I’d be happy to provide more information if it would help to diagnose the problem.



Currently collected information:

[analytics@bi-data-12 ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-102
Subversion git@github.com:hortonworks/hadoop.git<mailto:git@github.com:hortonworks/hadoop.git> -r 02ad68f19849a0a986dac36b705491f6791f9179
Compiled by jenkins on 2014-01-21T00:56Z
Compiled with protoc 2.5.0
From source with checksum 66f6c486e27479105979740178fbf0
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
[analytics@bi-data-12 ~]$


From the datanode service logs:
2014-04-25 13:23:00,349 ERROR datanode.DataNode (DataXceiver.java:writeBlock(540)) - DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, /disk2/hdfs/data/current, /disk3/hdfs/data/current, /disk4/hdfs/data/current]'}, localName='bi-data-12.soleocommunications.com:50010<http://bi-data-12.soleocommunications.com:50010>', storageID='DS-225705953-10.1.21.12-50010-1394477125650', xmitsInProgress=1}:Exception transfering block BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror 10.1.21.10:50010<http://10.1.21.10:50010>: java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,349 WARN  datanode.DataNode (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010:DataXceiverServer:
java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent xcievers: 4096
        at org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,349 INFO  datanode.DataNode (DataXceiver.java:writeBlock(600)) - opWriteBlock BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received exception java.io.EOFException: Premature EOF: no length prefix available
2014-04-25 13:23:00,350 ERROR datanode.DataNode (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver error processing WRITE_BLOCK operation  src: /10.1.21.12:37307<http://10.1.21.12:37307> dest: /10.1.21.12:50010<http://10.1.21.12:50010>
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
        at java.lang.Thread.run(Thread.java:722)
2014-04-25 13:23:00,352 INFO  datanode.DataNode (DataXceiver.java:writeBlock(432)) - Receiving BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /10.1.21.12:37311<http://10.1.21.12:37311> dest: /10.1.21.12:50010<http://10.1.21.12:50010>



From the map job logs we see exceptions like:

2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream

java.io.IOException: Connection reset by peer

        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)

        at sun.nio.ch.IOUtil.read(IOUtil.java:191)

        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)

        at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
…
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973<http://10.1.21.10:43973> remote=/10.1.21.10:50010<http://10.1.21.10:50010>]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
2014-04-25 13:04:53,116 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Abandoning BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
2014-04-25 13:04:53,117 INFO [Thread-590] org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010<http://10.1.21.10:50010>
2014-04-25 13:06:33,056 WARN [ResponseProcessor for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537<http://10.1.21.10:49537> remote=/10.1.21.12:50010<http://10.1.21.12:50010>]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
2014-04-25 13:07:18,519 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: All datanodes 10.1.21.12:50010<http://10.1.21.12:50010> are bad. Aborting...
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)

…

2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
…

2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)

        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)

        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)

        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)

        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)



Re: Map Reduce Error

Posted by divye sheth <di...@gmail.com>.
Hi Eric,

IMHO you do have a solution by increasing the xcievers count in
hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open
file descriptors.

Thanks
Divye Sheth


On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <
ewright@soleocommunications.com> wrote:

>  Currently, we are working on setting up a production hadoop cluster. We
> have a smaller cluster of four nodes we are working on for development
>  (nn, snn, and two datanodes). From a high level, we flume our data into
> HDFS, and then due to the way that flume may resend data on error (due to
> the way it handles it’s reliability guarantees), it is possible to get
> duplicate records. In our case, our batch size is 100 so we occasionally
> see 100 records duplicated. In order to remedy this, we de-duplicate the
> data using a map reduce job.
>
>
>
> For this job, the mapper has the following generic parameters
> <LongWriteable, Text, Text, Text>
>
>
>
> LongWriteable – byte offset from beginning of file
>
> Text – Line of text currently being processed
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
>
>
> And the reducer has the following generic parameters <Text, Text,
> NullWriteable, Text>
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
> NullWriteable – Empty key
>
> Text – Line of text currently being processed
>
>
>
> We read the standard LongWriteable, Text as provided by a
> TextFileInputFormat input key, value types coming into the mapper. We emit
> from the mapper the value as the key (ie. the full text string it passed in
> as the value to the mapper). This would essentially mean that the reducer
> will receive as input Text, Text. The reducer key is the string of text,
> and the value is a destination file. The reducer uses the value it received
> from the mapper to determine an output file as it uses MultipleOutputs.
>
>
>
> So, our map phase reads the strings, pushes the lines out as the keys so
> that the sort and partition phase send any duplicate lines to a single
> reducer, and then the reducer writes no more than one key (line) to its
> output file.
>
>
>
> Again, this does work at smaller scales, but when we increase our payload,
> we start getting numerous IO errors. It fails when we start processing 10’s
> of GB’s of data. I have tried numerous different things and believe that either
> our configuration is an issue or we are doing something fundamentally
> wrong.
>
>
>
> Any advice or things to check would be greatly appreciated. I’d be happy
> to provide more information if it would help to diagnose the problem.
>
>
>
>
>
>
>
> *Currently collected information:*
>
>
>
> [analytics@bi-data-12 ~]$ hadoop version
>
> Hadoop 2.2.0.2.0.6.0-102
>
> Subversion git@github.com:hortonworks/hadoop.git -r
> 02ad68f19849a0a986dac36b705491f6791f9179
>
> Compiled by jenkins on 2014-01-21T00:56Z
>
> Compiled with protoc 2.5.0
>
> From source with checksum 66f6c486e27479105979740178fbf0
>
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
>
> [analytics@bi-data-12 ~]$
>
>
>
>
>
> From the datanode service logs:
>
> 2014-04-25 13:23:00,349 ERROR datanode.DataNode
> (DataXceiver.java:writeBlock(540)) -
> DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current,
> /disk2/hdfs/data/current, /disk3/hdfs/data/current,
> /disk4/hdfs/data/current]'}, localName='
> bi-data-12.soleocommunications.com:50010',
> storageID='DS-225705953-10.1.21.12-50010-1394477125650',
> xmitsInProgress=1}:Exception transfering block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror
> 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix
> available
>
> 2014-04-25 13:23:00,349 WARN  datanode.DataNode
> (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010
> :DataXceiverServer:
>
> java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent
> xcievers: 4096
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,349 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(600)) - opWriteBlock
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received
> exception java.io.EOFException: Premature EOF: no length prefix available
>
> 2014-04-25 13:23:00,350 ERROR datanode.DataNode
> (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver
> error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /
> 10.1.21.12:50010
>
> java.io.EOFException: Premature EOF: no length prefix available
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,352 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(432)) - Receiving
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /
> 10.1.21.12:37311 dest: /10.1.21.12:50010
>
>
>
>
>
>
>
> From the map job logs we see exceptions like:
>
> 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> java.io.IOException: Connection reset by peer
>
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
>
>         at sun.nio.ch.IOUtil.read(IOUtil.java:191)
>
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
>
>         at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>
>         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> …
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973remote=/
> 10.1.21.10:50010]
>
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Abandoning
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
>
> 2014-04-25 13:04:53,117 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
>
> 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012]
> org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
>
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/
> 10.1.21.12:50010]
>
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
>
> 2014-04-25 13:07:18,519 ERROR [main]
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException
> as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
> 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild:
> Exception running child : java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
>
>
>
> …
>
> 2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
>
> …
>
> 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>
>

Re: Map Reduce Error

Posted by divye sheth <di...@gmail.com>.
Hi Eric,

IMHO you do have a solution by increasing the xcievers count in
hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open
file descriptors.

Thanks
Divye Sheth


On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <
ewright@soleocommunications.com> wrote:

>  Currently, we are working on setting up a production hadoop cluster. We
> have a smaller cluster of four nodes we are working on for development
>  (nn, snn, and two datanodes). From a high level, we flume our data into
> HDFS, and then due to the way that flume may resend data on error (due to
> the way it handles it’s reliability guarantees), it is possible to get
> duplicate records. In our case, our batch size is 100 so we occasionally
> see 100 records duplicated. In order to remedy this, we de-duplicate the
> data using a map reduce job.
>
>
>
> For this job, the mapper has the following generic parameters
> <LongWriteable, Text, Text, Text>
>
>
>
> LongWriteable – byte offset from beginning of file
>
> Text – Line of text currently being processed
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
>
>
> And the reducer has the following generic parameters <Text, Text,
> NullWriteable, Text>
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
> NullWriteable – Empty key
>
> Text – Line of text currently being processed
>
>
>
> We read the standard LongWriteable, Text as provided by a
> TextFileInputFormat input key, value types coming into the mapper. We emit
> from the mapper the value as the key (ie. the full text string it passed in
> as the value to the mapper). This would essentially mean that the reducer
> will receive as input Text, Text. The reducer key is the string of text,
> and the value is a destination file. The reducer uses the value it received
> from the mapper to determine an output file as it uses MultipleOutputs.
>
>
>
> So, our map phase reads the strings, pushes the lines out as the keys so
> that the sort and partition phase send any duplicate lines to a single
> reducer, and then the reducer writes no more than one key (line) to its
> output file.
>
>
>
> Again, this does work at smaller scales, but when we increase our payload,
> we start getting numerous IO errors. It fails when we start processing 10’s
> of GB’s of data. I have tried numerous different things and believe that either
> our configuration is an issue or we are doing something fundamentally
> wrong.
>
>
>
> Any advice or things to check would be greatly appreciated. I’d be happy
> to provide more information if it would help to diagnose the problem.
>
>
>
>
>
>
>
> *Currently collected information:*
>
>
>
> [analytics@bi-data-12 ~]$ hadoop version
>
> Hadoop 2.2.0.2.0.6.0-102
>
> Subversion git@github.com:hortonworks/hadoop.git -r
> 02ad68f19849a0a986dac36b705491f6791f9179
>
> Compiled by jenkins on 2014-01-21T00:56Z
>
> Compiled with protoc 2.5.0
>
> From source with checksum 66f6c486e27479105979740178fbf0
>
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
>
> [analytics@bi-data-12 ~]$
>
>
>
>
>
> From the datanode service logs:
>
> 2014-04-25 13:23:00,349 ERROR datanode.DataNode
> (DataXceiver.java:writeBlock(540)) -
> DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current,
> /disk2/hdfs/data/current, /disk3/hdfs/data/current,
> /disk4/hdfs/data/current]'}, localName='
> bi-data-12.soleocommunications.com:50010',
> storageID='DS-225705953-10.1.21.12-50010-1394477125650',
> xmitsInProgress=1}:Exception transfering block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror
> 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix
> available
>
> 2014-04-25 13:23:00,349 WARN  datanode.DataNode
> (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010
> :DataXceiverServer:
>
> java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent
> xcievers: 4096
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,349 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(600)) - opWriteBlock
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received
> exception java.io.EOFException: Premature EOF: no length prefix available
>
> 2014-04-25 13:23:00,350 ERROR datanode.DataNode
> (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver
> error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /
> 10.1.21.12:50010
>
> java.io.EOFException: Premature EOF: no length prefix available
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,352 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(432)) - Receiving
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /
> 10.1.21.12:37311 dest: /10.1.21.12:50010
>
>
>
>
>
>
>
> From the map job logs we see exceptions like:
>
> 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> java.io.IOException: Connection reset by peer
>
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
>
>         at sun.nio.ch.IOUtil.read(IOUtil.java:191)
>
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
>
>         at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>
>         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> …
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973remote=/
> 10.1.21.10:50010]
>
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Abandoning
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
>
> 2014-04-25 13:04:53,117 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
>
> 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012]
> org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
>
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/
> 10.1.21.12:50010]
>
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
>
> 2014-04-25 13:07:18,519 ERROR [main]
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException
> as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
> 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild:
> Exception running child : java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
>
>
>
> …
>
> 2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
>
> …
>
> 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>
>

Re: Map Reduce Error

Posted by divye sheth <di...@gmail.com>.
Hi Eric,

IMHO you do have a solution by increasing the xcievers count in
hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open
file descriptors.

Thanks
Divye Sheth


On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <
ewright@soleocommunications.com> wrote:

>  Currently, we are working on setting up a production hadoop cluster. We
> have a smaller cluster of four nodes we are working on for development
>  (nn, snn, and two datanodes). From a high level, we flume our data into
> HDFS, and then due to the way that flume may resend data on error (due to
> the way it handles it’s reliability guarantees), it is possible to get
> duplicate records. In our case, our batch size is 100 so we occasionally
> see 100 records duplicated. In order to remedy this, we de-duplicate the
> data using a map reduce job.
>
>
>
> For this job, the mapper has the following generic parameters
> <LongWriteable, Text, Text, Text>
>
>
>
> LongWriteable – byte offset from beginning of file
>
> Text – Line of text currently being processed
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
>
>
> And the reducer has the following generic parameters <Text, Text,
> NullWriteable, Text>
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
> NullWriteable – Empty key
>
> Text – Line of text currently being processed
>
>
>
> We read the standard LongWriteable, Text as provided by a
> TextFileInputFormat input key, value types coming into the mapper. We emit
> from the mapper the value as the key (ie. the full text string it passed in
> as the value to the mapper). This would essentially mean that the reducer
> will receive as input Text, Text. The reducer key is the string of text,
> and the value is a destination file. The reducer uses the value it received
> from the mapper to determine an output file as it uses MultipleOutputs.
>
>
>
> So, our map phase reads the strings, pushes the lines out as the keys so
> that the sort and partition phase send any duplicate lines to a single
> reducer, and then the reducer writes no more than one key (line) to its
> output file.
>
>
>
> Again, this does work at smaller scales, but when we increase our payload,
> we start getting numerous IO errors. It fails when we start processing 10’s
> of GB’s of data. I have tried numerous different things and believe that either
> our configuration is an issue or we are doing something fundamentally
> wrong.
>
>
>
> Any advice or things to check would be greatly appreciated. I’d be happy
> to provide more information if it would help to diagnose the problem.
>
>
>
>
>
>
>
> *Currently collected information:*
>
>
>
> [analytics@bi-data-12 ~]$ hadoop version
>
> Hadoop 2.2.0.2.0.6.0-102
>
> Subversion git@github.com:hortonworks/hadoop.git -r
> 02ad68f19849a0a986dac36b705491f6791f9179
>
> Compiled by jenkins on 2014-01-21T00:56Z
>
> Compiled with protoc 2.5.0
>
> From source with checksum 66f6c486e27479105979740178fbf0
>
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
>
> [analytics@bi-data-12 ~]$
>
>
>
>
>
> From the datanode service logs:
>
> 2014-04-25 13:23:00,349 ERROR datanode.DataNode
> (DataXceiver.java:writeBlock(540)) -
> DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current,
> /disk2/hdfs/data/current, /disk3/hdfs/data/current,
> /disk4/hdfs/data/current]'}, localName='
> bi-data-12.soleocommunications.com:50010',
> storageID='DS-225705953-10.1.21.12-50010-1394477125650',
> xmitsInProgress=1}:Exception transfering block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror
> 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix
> available
>
> 2014-04-25 13:23:00,349 WARN  datanode.DataNode
> (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010
> :DataXceiverServer:
>
> java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent
> xcievers: 4096
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,349 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(600)) - opWriteBlock
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received
> exception java.io.EOFException: Premature EOF: no length prefix available
>
> 2014-04-25 13:23:00,350 ERROR datanode.DataNode
> (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver
> error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /
> 10.1.21.12:50010
>
> java.io.EOFException: Premature EOF: no length prefix available
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,352 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(432)) - Receiving
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /
> 10.1.21.12:37311 dest: /10.1.21.12:50010
>
>
>
>
>
>
>
> From the map job logs we see exceptions like:
>
> 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> java.io.IOException: Connection reset by peer
>
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
>
>         at sun.nio.ch.IOUtil.read(IOUtil.java:191)
>
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
>
>         at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>
>         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> …
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973remote=/
> 10.1.21.10:50010]
>
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Abandoning
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
>
> 2014-04-25 13:04:53,117 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
>
> 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012]
> org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
>
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/
> 10.1.21.12:50010]
>
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
>
> 2014-04-25 13:07:18,519 ERROR [main]
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException
> as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
> 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild:
> Exception running child : java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
>
>
>
> …
>
> 2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
>
> …
>
> 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>
>

Re: Map Reduce Error

Posted by divye sheth <di...@gmail.com>.
Hi Eric,

IMHO you do have a solution by increasing the xcievers count in
hdfs-site.xml, but this might give you a performance hit

<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>

To get a better understanding of how xcievers work go through this link:
http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/

Also make sure you do not have a leak in your code where there are open
file descriptors.

Thanks
Divye Sheth


On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric <
ewright@soleocommunications.com> wrote:

>  Currently, we are working on setting up a production hadoop cluster. We
> have a smaller cluster of four nodes we are working on for development
>  (nn, snn, and two datanodes). From a high level, we flume our data into
> HDFS, and then due to the way that flume may resend data on error (due to
> the way it handles it’s reliability guarantees), it is possible to get
> duplicate records. In our case, our batch size is 100 so we occasionally
> see 100 records duplicated. In order to remedy this, we de-duplicate the
> data using a map reduce job.
>
>
>
> For this job, the mapper has the following generic parameters
> <LongWriteable, Text, Text, Text>
>
>
>
> LongWriteable – byte offset from beginning of file
>
> Text – Line of text currently being processed
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
>
>
> And the reducer has the following generic parameters <Text, Text,
> NullWriteable, Text>
>
> Text – Line of text currently being processed
>
> Text – Output File Indication
>
> NullWriteable – Empty key
>
> Text – Line of text currently being processed
>
>
>
> We read the standard LongWriteable, Text as provided by a
> TextFileInputFormat input key, value types coming into the mapper. We emit
> from the mapper the value as the key (ie. the full text string it passed in
> as the value to the mapper). This would essentially mean that the reducer
> will receive as input Text, Text. The reducer key is the string of text,
> and the value is a destination file. The reducer uses the value it received
> from the mapper to determine an output file as it uses MultipleOutputs.
>
>
>
> So, our map phase reads the strings, pushes the lines out as the keys so
> that the sort and partition phase send any duplicate lines to a single
> reducer, and then the reducer writes no more than one key (line) to its
> output file.
>
>
>
> Again, this does work at smaller scales, but when we increase our payload,
> we start getting numerous IO errors. It fails when we start processing 10’s
> of GB’s of data. I have tried numerous different things and believe that either
> our configuration is an issue or we are doing something fundamentally
> wrong.
>
>
>
> Any advice or things to check would be greatly appreciated. I’d be happy
> to provide more information if it would help to diagnose the problem.
>
>
>
>
>
>
>
> *Currently collected information:*
>
>
>
> [analytics@bi-data-12 ~]$ hadoop version
>
> Hadoop 2.2.0.2.0.6.0-102
>
> Subversion git@github.com:hortonworks/hadoop.git -r
> 02ad68f19849a0a986dac36b705491f6791f9179
>
> Compiled by jenkins on 2014-01-21T00:56Z
>
> Compiled with protoc 2.5.0
>
> From source with checksum 66f6c486e27479105979740178fbf0
>
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar
>
> [analytics@bi-data-12 ~]$
>
>
>
>
>
> From the datanode service logs:
>
> 2014-04-25 13:23:00,349 ERROR datanode.DataNode
> (DataXceiver.java:writeBlock(540)) -
> DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current,
> /disk2/hdfs/data/current, /disk3/hdfs/data/current,
> /disk4/hdfs/data/current]'}, localName='
> bi-data-12.soleocommunications.com:50010',
> storageID='DS-225705953-10.1.21.12-50010-1394477125650',
> xmitsInProgress=1}:Exception transfering block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror
> 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix
> available
>
> 2014-04-25 13:23:00,349 WARN  datanode.DataNode
> (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010
> :DataXceiverServer:
>
> java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent
> xcievers: 4096
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,349 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(600)) - opWriteBlock
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received
> exception java.io.EOFException: Premature EOF: no length prefix available
>
> 2014-04-25 13:23:00,350 ERROR datanode.DataNode
> (DataXceiver.java:run(225)) - bi-data-12.soleocommunications.com:50010:DataXceiver
> error processing WRITE_BLOCK operation  src: /10.1.21.12:37307 dest: /
> 10.1.21.12:50010
>
> java.io.EOFException: Premature EOF: no length prefix available
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68)
>
>         at
> org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221)
>
>         at java.lang.Thread.run(Thread.java:722)
>
> 2014-04-25 13:23:00,352 INFO  datanode.DataNode
> (DataXceiver.java:writeBlock(432)) - Receiving
> BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: /
> 10.1.21.12:37311 dest: /10.1.21.12:50010
>
>
>
>
>
>
>
> From the map job logs we see exceptions like:
>
> 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> java.io.IOException: Connection reset by peer
>
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
>
>         at sun.nio.ch.IOUtil.read(IOUtil.java:191)
>
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
>
>         at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
>
>         at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> …
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream
>
> org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while
> waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending local=/10.1.21.10:43973remote=/
> 10.1.21.10:50010]
>
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
>
> 2014-04-25 13:04:53,116 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Abandoning
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558
>
> 2014-04-25 13:04:53,117 INFO [Thread-590]
> org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010
>
> 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012]
> org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor
> exception  for block
> BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012
>
> java.net.SocketTimeoutException: 65000 millis timeout while waiting for
> channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/
> 10.1.21.12:50010]
>
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
>
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at java.io.FilterInputStream.read(FilterInputStream.java:83)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
>
>         at
> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
>
> 2014-04-25 13:07:18,519 ERROR [main]
> org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException
> as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
> 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild:
> Exception running child : java.io.IOException: All datanodes
> 10.1.21.12:50010 are bad. Aborting...
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
>
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
>
>
>
> …
>
> 2014-04-25 13:32:40,197 ERROR [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:mapred (auth:SIMPLE) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047)
>
> …
>
> 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client 10.1.21.12 because current leaseholder is trying to recreate file.
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043)
>
>         at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996)
>
>         at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491)
>
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301)
>
>         at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570)
>
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
>
>
>