You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@giraph.apache.org by Yuanyuan Tian <yt...@us.ibm.com> on 2012/07/05 20:08:03 UTC
Re: wierd communication errors
I have tried mvn -Dgiraph.useNetty=true compile. The job still failed on
the same error (see below). It doesn't look like that Netty is being used
at all. Did I miss anything?
java.lang.IllegalStateException: flush: Got ExecutionException
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1102)
at
org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:968)
at
org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:613)
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:657)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native
Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: java.io.IOException: Call to
idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception:
java.io.IOException: Connection reset by peer
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at
java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1097)
... 10 more
Caused by: java.lang.RuntimeException: java.io.IOException: Call to
idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:368)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at
java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to
idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
at $Proxy3.putVertexIdMessagesList(Unknown Source)
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:328)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.write0(Native Method)
at
sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
at
sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
at sun.nio.ch.IOUtil.write(IOUtil.java:43)
at
sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:334)
at
org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:55)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:146)
at
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:107)
at
java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at
java.io.DataOutputStream.write(DataOutputStream.java:90)
at
org.apache.hadoop.ipc.Client$Connection.sendParam(Client.java:745)
at org.apache.hadoop.ipc.Client.call(Client.java:1011)
... 9 more
Yuanyuan
From: Avery Ching <ac...@apache.org>
To: user@giraph.apache.org
Date: 06/29/2012 05:41 PM
Subject: Re: wierd communication errors
Please try -Dgiraph.useNetty=true
On 6/29/12 11:35 AM, Yuanyuan Tian wrote:
Hi Avery,
I have got better understanding of the problem now. I think I hit the
scalability limit of Giraph. I was running 90 workers on a 16 nodes
cluster (each node can run 7 concurrent mappers). So, on average each node
ran 5 or 6 workers, with each worker maintaining 89 RPC connections. Plus,
each worker in my job was sending a lot of messages in each iteration. As
a result, the RPC connections got very unstable. When I tried to reduce
the number of workers to 40 and number of concurrent mappers each node can
run to 4, the job can run without any problem.
I think my experience revealed two limitations of the current Giraph:
- Scalability: suppose we have n workers in a Giraph job, then each worker
will maintain (n-1) RPC connections. There are totally n*(n-1) RPC
connections in the job. As n increases, the number of RPC connections
quickly grows out of the capacity of the current giraph system.
- Fault Tolerance: when "connection reset by peer" or "broken pipe"
happens, the job just hangs, then eventually dies after timeout. There is
no re-establishment of a connection or automatic restart of a failed
worker.
I am very curious whether there is any plan to address these two
limitations.
BTW: I checked out the latest code from trunk. I did see some code using
netty. But the BasicRPCCommunications is still using Hadoop RPC. Is there
a nob or something I need to turn on to use netty?
Yuanyuan
Yuanyuan Tian---06/28/2012 10:16:10 AM---I can try the netty version then.
But what is the cause of reset by peer? Time out? And if it happen
From: Yuanyuan Tian/Almaden/IBM
To: user@giraph.apache.org
Cc: user@giraph.apache.org
Date: 06/28/2012 10:16 AM
Subject: Re: wierd communication errors
I can try the netty version then. But what is the cause of reset by peer?
Time out? And if it happens, how can I reestablish the connection? I can
add some code to check the connection first and to reestablish the
connection if reset by peer before calling putVertexIdMessagesList.
Yuanyuan
Avery Ching ---06/28/2012 01:20:48 AM---In my testing, I found the netty
implementation of Giraph (trunk) to be more stable than Hadoop RPC
From: Avery Ching <ac...@apache.org>
To: user@giraph.apache.org
Date: 06/28/2012 01:20 AM
Subject: Re: wierd communication errors
In my testing, I found the netty implementation of Giraph (trunk) to be
more stable than Hadoop RPC. But you can't do too much (other than
reestablish the connection) when the connection is reset by peer.
Avery
On 6/28/12 12:29 AM, Yuanyuan Tian wrote:
I want to make a correction about the errors. The error should be as
follows. The errors in my previous email are from my added debug message.
But the problem is the same, somehow some connection was reset by peer. I
did more tries. Occasionally, my job can actually run without a problem,
then more times the job fails because of this connection reset problem. I
really don't have a clue what the problem is.
Yuanyuan
java.lang.IllegalStateException: run: Caught an unrecoverable exception
flush: Got ExecutionException
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native
Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1085)
at
org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
at
org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: java.io.IOException: Call to
idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
java.io.IOException: Connection reset by peer
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1080)
... 10 more
Caused by: java.lang.RuntimeException: java.io.IOException: Call to
idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:379)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to
idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
at $Proxy3.putVertexIdMessagesList(Unknown Source)
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:339)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:175)
at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at
org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at
java.io.FilterInputStream.read(FilterInputStream.java:116)
at
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
at
java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at
java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at
java.io.DataInputStream.readInt(DataInputStream.java:370)
at
org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
at
org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
From: Yuanyuan Tian/Almaden/IBM@IBMUS
To: user@giraph.apache.org
Cc: user@giraph.apache.org
Date: 06/27/2012 10:02 PM
Subject: Re: wierd communication errors
What do you mean using netty? I am not aware that Giraph is using netty. I
am just using what ever the default giraph release 1.0 is using.
Yuanyuan
From: Avery Ching <ac...@apache.org>
To: user@giraph.apache.org
Date: 06/27/2012 07:57 PM
Subject: Re: wierd communication errors
Same issue using netty as well?
On 6/27/12 6:14 PM, Yuanyuan Tian wrote:
Hi,
I was running a giraph job where I constantly got the following
communication related errors. The symptom is that in super step 0, most of
the workers succeeded but a few of the workers produced the errors below,
the machines that caused the connection reset are different in each failed
worker. To rule out the probability of the cluster setup error, I also ran
a different job and it worked fine. So, the error must be caused by this
particular giraph job. My giraph job is just normal message propagation
type of job, except that the message is not a of a unique type. Therefore,
I defined a special message type (also copied in this email) that
incorporates two different types of messages: integer message and double
array message. I have tried all day but still couldn't ping point the
source of the bug. Can anyone give me some hints on what may have caused
this error?
Thanks a lot,
java.lang.IllegalStateException: run: Caught an unrecoverable exception
flush: Got ExecutionException
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1082)
at
org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
at
org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
at
org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
... 7 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.reflect.UndeclaredThrowableException
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1077)
... 10 more
Caused by: java.lang.reflect.UndeclaredThrowableException
at $Proxy3.getName(Unknown Source)
at
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:335)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to
idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception:
java.io.IOException: Connection reset by peer
at
org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
... 8 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:175)
at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at
org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at java.io.FilterInputStream.read(FilterInputStream.java:116)
at
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
at
java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at
java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at
org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
at
org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
My special messge type:
public class MyMessageWritable implements Writable{
public byte msgType=0;
public long vertexID=-1;
public double[] arrayMsg=null;
public int intMsg=-1;
public MyMessageWritable ()
{
}
public MyMessageWritable (long id, byte tp, int msg)
{
vertexID=id;
msgType=tp;
intMsg=msg;
}
public MyMessageWritable (long id, byte tp, double[] arr)
{
vertexID=id;
msgType=tp;
arrayMsg=arr;
}
@Override
public void readFields(DataInput in) throws IOException {
vertexID=in.readLong();
msgType=in.readByte();
switch(msgType)
{
case 1:
case 4:
intMsg=in.readInt();
break;
case 2:
case 3:
if(arrayMsg==null)
arrayMsg=new double[MyVertex.K];
for(int i=0; i<MyVertex.K; i++)
arrayMsg[i]=in.readDouble();
break;
default:
throw new IOException("message type invalid:
"+msgType);
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(vertexID);
out.writeByte(msgType);
switch(msgType)
{
case 1:
case 4:
out.writeInt(intMsg);
break;
case 2:
case 3:
if(arrayMsg==null)
throw new IOException("array message is null"
);
for(int i=0; i<MyVertex.K; i++)
out.writeDouble(arrayMsg[i]);
break;
default:
throw new IOException("message type invalid:
"+msgType);
}
}
Re: wierd communication errors
Posted by Alessandro Presta <al...@fb.com>.
useNetty is a runtime option, so you should pass it when running the job.
From: Yuanyuan Tian <yt...@us.ibm.com>>
Reply-To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Date: Thursday, July 5, 2012 7:08 PM
To: "user@giraph.apache.org<ma...@giraph.apache.org>" <us...@giraph.apache.org>>
Subject: Re: wierd communication errors
I have tried mvn -Dgiraph.useNetty=true compile. The job still failed on the same error (see below). It doesn't look like that Netty is being used at all. Did I miss anything?
java.lang.IllegalStateException: flush: Got ExecutionException
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1102)
at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:968)
at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:613)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:657)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Call to idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception: java.io.IOException: Connection reset by peer
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1097)
... 10 more
Caused by: java.lang.RuntimeException: java.io.IOException: Call to idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception: java.io.IOException: Connection reset by peer
at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:368)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to idp28.almaden.ibm.com/172.16.0.28:30051 failed on local exception: java.io.IOException: Connection reset by peer
at org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
at $Proxy3.putVertexIdMessagesList(Unknown Source)
at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:328)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
at sun.nio.ch.IOUtil.write(IOUtil.java:43)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:334)
at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:55)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:146)
at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:107)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.ipc.Client$Connection.sendParam(Client.java:745)
at org.apache.hadoop.ipc.Client.call(Client.java:1011)
... 9 more
Yuanyuan
From: Avery Ching <ac...@apache.org>>
To: user@giraph.apache.org<ma...@giraph.apache.org>
Date: 06/29/2012 05:41 PM
Subject: Re: wierd communication errors
________________________________
Please try -Dgiraph.useNetty=true
On 6/29/12 11:35 AM, Yuanyuan Tian wrote:
Hi Avery,
I have got better understanding of the problem now. I think I hit the scalability limit of Giraph. I was running 90 workers on a 16 nodes cluster (each node can run 7 concurrent mappers). So, on average each node ran 5 or 6 workers, with each worker maintaining 89 RPC connections. Plus, each worker in my job was sending a lot of messages in each iteration. As a result, the RPC connections got very unstable. When I tried to reduce the number of workers to 40 and number of concurrent mappers each node can run to 4, the job can run without any problem.
I think my experience revealed two limitations of the current Giraph:
- Scalability: suppose we have n workers in a Giraph job, then each worker will maintain (n-1) RPC connections. There are totally n*(n-1) RPC connections in the job. As n increases, the number of RPC connections quickly grows out of the capacity of the current giraph system.
- Fault Tolerance: when "connection reset by peer" or "broken pipe" happens, the job just hangs, then eventually dies after timeout. There is no re-establishment of a connection or automatic restart of a failed worker.
I am very curious whether there is any plan to address these two limitations.
BTW: I checked out the latest code from trunk. I did see some code using netty. But the BasicRPCCommunications is still using Hadoop RPC. Is there a nob or something I need to turn on to use netty?
Yuanyuan
[Inactive hide details for Yuanyuan Tian---06/28/2012 10:16:10 AM---I can try the netty version then. But what is the cause of r]Yuanyuan Tian---06/28/2012 10:16:10 AM---I can try the netty version then. But what is the cause of reset by peer? Time out? And if it happen
From: Yuanyuan Tian/Almaden/IBM
To: user@giraph.apache.org<ma...@giraph.apache.org>
Cc: user@giraph.apache.org<ma...@giraph.apache.org>
Date: 06/28/2012 10:16 AM
Subject: Re: wierd communication errors
________________________________
I can try the netty version then. But what is the cause of reset by peer? Time out? And if it happens, how can I reestablish the connection? I can add some code to check the connection first and to reestablish the connection if reset by peer before calling putVertexIdMessagesList.
Yuanyuan
[Inactive hide details for Avery Ching ---06/28/2012 01:20:48 AM---In my testing, I found the netty implementation of Giraph (tr]Avery Ching ---06/28/2012 01:20:48 AM---In my testing, I found the netty implementation of Giraph (trunk) to be more stable than Hadoop RPC
From: Avery Ching <ac...@apache.org>
To: user@giraph.apache.org<ma...@giraph.apache.org>
Date: 06/28/2012 01:20 AM
Subject: Re: wierd communication errors
________________________________
In my testing, I found the netty implementation of Giraph (trunk) to be more stable than Hadoop RPC. But you can't do too much (other than reestablish the connection) when the connection is reset by peer.
Avery
On 6/28/12 12:29 AM, Yuanyuan Tian wrote:
I want to make a correction about the errors. The error should be as follows. The errors in my previous email are from my added debug message. But the problem is the same, somehow some connection was reset by peer. I did more tries. Occasionally, my job can actually run without a problem, then more times the job fails because of this connection reset problem. I really don't have a clue what the problem is.
Yuanyuan
java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1085)
at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1080)
... 10 more
Caused by: java.lang.RuntimeException: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer
at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:379)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception: java.io.IOException: Connection reset by peer
at org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
at $Proxy3.putVertexIdMessagesList(Unknown Source)
at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:339)
... 6 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:175)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at java.io.FilterInputStream.read(FilterInputStream.java:116)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
From: Yuanyuan Tian/Almaden/IBM@IBMUS
To: user@giraph.apache.org<ma...@giraph.apache.org>
Cc: user@giraph.apache.org<ma...@giraph.apache.org>
Date: 06/27/2012 10:02 PM
Subject: Re: wierd communication errors
________________________________
What do you mean using netty? I am not aware that Giraph is using netty. I am just using what ever the default giraph release 1.0 is using.
Yuanyuan
From: Avery Ching <ac...@apache.org>
To: user@giraph.apache.org<ma...@giraph.apache.org>
Date: 06/27/2012 07:57 PM
Subject: Re: wierd communication errors
________________________________
Same issue using netty as well?
On 6/27/12 6:14 PM, Yuanyuan Tian wrote:
Hi,
I was running a giraph job where I constantly got the following communication related errors. The symptom is that in super step 0, most of the workers succeeded but a few of the workers produced the errors below, the machines that caused the connection reset are different in each failed worker. To rule out the probability of the cluster setup error, I also ran a different job and it worked fine. So, the error must be caused by this particular giraph job. My giraph job is just normal message propagation type of job, except that the message is not a of a unique type. Therefore, I defined a special message type (also copied in this email) that incorporates two different types of messages: integer message and double array message. I have tried all day but still couldn't ping point the source of the bug. Can anyone give me some hints on what may have caused this error?
Thanks a lot,
java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1082)
at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.reflect.UndeclaredThrowableException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1077)
... 10 more
Caused by: java.lang.reflect.UndeclaredThrowableException
at $Proxy3.getName(Unknown Source)
at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:335)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Call to idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception: java.io.IOException: Connection reset by peer
at org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
at org.apache.hadoop.ipc.Client.call(Client.java:1033)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
... 8 more
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:175)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at java.io.FilterInputStream.read(FilterInputStream.java:116)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
My special messge type:
public class MyMessageWritable implements Writable{
publicbytemsgType=0;
publiclongvertexID=-1;
publicdouble[] arrayMsg=null;
publicintintMsg=-1;
public MyMessageWritable ()
{
}
public MyMessageWritable (long id, byte tp, int msg)
{
vertexID=id;
msgType=tp;
intMsg=msg;
}
public MyMessageWritable (long id, byte tp, double[] arr)
{
vertexID=id;
msgType=tp;
arrayMsg=arr;
}
@Override
publicvoid readFields(DataInput in) throws IOException {
vertexID=in.readLong();
msgType=in.readByte();
switch(msgType)
{
case 1:
case 4:
intMsg=in.readInt();
break;
case 2:
case 3:
if(arrayMsg==null)
arrayMsg=newdouble[MyVertex.K];
for(int i=0; i<MyVertex.K; i++)
arrayMsg[i]=in.readDouble();
break;
default:
thrownew IOException("message type invalid: "+msgType);
}
}
@Override
publicvoid write(DataOutput out) throws IOException {
out.writeLong(vertexID);
out.writeByte(msgType);
switch(msgType)
{
case 1:
case 4:
out.writeInt(intMsg);
break;
case 2:
case 3:
if(arrayMsg==null)
thrownew IOException("array message is null");
for(int i=0; i<MyVertex.K; i++)
out.writeDouble(arrayMsg[i]);
break;
default:
thrownew IOException("message type invalid: "+msgType);
}
}