You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/09/20 01:12:22 UTC
svn commit: r697275 - in /hadoop/core/trunk: CHANGES.txt
src/core/org/apache/hadoop/ipc/Client.java src/test/findbugsExcludeFile.xml
Author: hairong
Date: Fri Sep 19 16:12:22 2008
New Revision: 697275
URL: http://svn.apache.org/viewvc?rev=697275&view=rev
Log:
HADOOP-4062. IPC client does not need to be synchronized on the output stream when a connection is closed. Contributed by Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/test/findbugsExcludeFile.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:12:22 2008
@@ -396,6 +396,10 @@
HADOOP-2165. Augmented JobHistory to include the URIs to the tasks'
userlogs. (Vinod Kumar Vavilapalli via acmurthy)
+ HADOOP-4062. Remove the synchronization on the output stream when a
+ connection is closed and also remove an undesirable exception when
+ a client is stoped while there is no pending RPC request. (hairong)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Fri Sep 19 16:12:22 2008
@@ -36,7 +36,6 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
@@ -178,8 +177,8 @@
private ConnectionId remoteId;
private Socket socket = null; // connected socket
private DataInputStream in;
- private AtomicReference<DataOutputStream> out =
- new AtomicReference<DataOutputStream>();
+ private DataOutputStream out;
+
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@@ -311,8 +310,8 @@
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));
- this.out.set(new DataOutputStream
- (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
+ this.out = new DataOutputStream
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
writeHeader();
// update last activity time
@@ -370,7 +369,6 @@
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
- DataOutputStream out = this.out.get();
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
@@ -403,13 +401,13 @@
return true;
} else if (shouldCloseConnection.get()) {
return false;
- } else if (!running.get()) { //get stopped
+ } else if (calls.isEmpty()) { // idle connection closed or stopped
+ markClosed(null);
+ return false;
+ } else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
- } else { // closed because it has been idle for more than maxIdleTime
- markClosed(null);
- return false;
}
}
@@ -425,7 +423,6 @@
if ( curTime - lastActivity.get() >= pingInterval) {
lastActivity.set(curTime);
synchronized (out) {
- DataOutputStream out = this.out.get();
out.writeInt(PING_CALL_ID);
out.flush();
}
@@ -460,9 +457,6 @@
DataOutputBuffer d=null;
try {
synchronized (this.out) {
- DataOutputStream out = this.out.get();
- if (out==null) return; // socket has closed
-
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
@@ -531,42 +525,37 @@
return;
}
- synchronized (out) {
- // release the resources
- // first thing to do;take the connection out of the connection list
- synchronized (connections) {
- if (connections.get(remoteId) == this) {
- connections.remove(remoteId);
- }
+ // release the resources
+ // first thing to do;take the connection out of the connection list
+ synchronized (connections) {
+ if (connections.get(remoteId) == this) {
+ connections.remove(remoteId);
}
+ }
- // close the socket and streams
- IOUtils.closeStream(in);
- in = null;
- IOUtils.closeStream(out.getAndSet(null));
- IOUtils.closeSocket(socket);
- socket = null;
-
- // clean up all calls
- if (closeException == null) {
- if (!calls.isEmpty()) {
- LOG.warn(
- "A connection is closed for no cause and calls are not empty");
-
- // clean up calls anyway
- closeException = new IOException("Unexpected closed connection");
- cleanupCalls();
- }
- } else {
- // log the info
- if (LOG.isDebugEnabled()) {
- LOG.debug("closing ipc connection to " + remoteId.address + ": " +
- closeException.getMessage(),closeException);
- }
+ // close the streams and therefore the socket
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+
+ // clean up all calls
+ if (closeException == null) {
+ if (!calls.isEmpty()) {
+ LOG.warn(
+ "A connection is closed for no cause and calls are not empty");
- // cleanup calls
+ // clean up calls anyway
+ closeException = new IOException("Unexpected closed connection");
cleanupCalls();
}
+ } else {
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+ closeException.getMessage(),closeException);
+ }
+
+ // cleanup calls
+ cleanupCalls();
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": closed");
Modified: hadoop/core/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/findbugsExcludeFile.xml?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/core/trunk/src/test/findbugsExcludeFile.xml Fri Sep 19 16:12:22 2008
@@ -19,4 +19,13 @@
<Field name="_jspx_dependants" />
<Bug pattern="UWF_UNWRITTEN_FIELD" />
</Match>
+ <!--
+ Inconsistent synchronization for Client.Connection.out is
+ is intentional to make a connection to be closed instantly.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.ipc.Client$Connection" />
+ <Field name="out" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
</FindBugsFilter>