You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/06/06 07:38:54 UTC
svn commit: r663828 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/test/org/apache/hadoop/ipc/TestIPC.java
Author: hairong
Date: Thu Jun 5 22:38:54 2008
New Revision: 663828
URL: http://svn.apache.org/viewvc?rev=663828&view=rev
Log:
HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and improve its synchronization. Contributed by Steve Loughran and Hairong Kuang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 5 22:38:54 2008
@@ -473,6 +473,9 @@
HADOOP-3493. Fix TestStreamingFailure to use FileUtil.fullyDelete to
ensure correct cleanup. (Lohit Vijayarenu via acmurthy)
+ HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and
+ improve its synchronization. (hairong)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Thu Jun 5 22:38:54 2008
@@ -34,6 +34,9 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
@@ -48,7 +51,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
@@ -65,7 +67,7 @@
private Class<?> valueClass; // class of call values
private int counter; // counter for call ids
- private boolean running = true; // true while client runs
+ private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
final private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
@@ -125,7 +127,7 @@
synchronized boolean isZeroReference() {
return refCount==0;
}
-
+
/** A call waiting for a value. */
private class Call {
int id; // call id
@@ -175,12 +177,13 @@
private class Connection extends Thread {
private ConnectionId remoteId;
private Socket socket = null; // connected socket
- private DataInputStream in;
- private DataOutputStream out;
+ private DataInputStream in;
+ private AtomicReference<DataOutputStream> out =
+ new AtomicReference<DataOutputStream>();
// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
- private long lastActivity = 0; // last I/O activity time
- private boolean shouldCloseConnection = false; // indicate if the connection is closed
+ private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
+ private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
public Connection(InetSocketAddress address) throws IOException {
@@ -201,23 +204,25 @@
}
/** Update lastActivity with the current time. */
- private synchronized void touch() {
- touch(System.currentTimeMillis());
- }
-
- private synchronized void touch(long curTime) {
- lastActivity = curTime;
+ private void touch() {
+ lastActivity.set(System.currentTimeMillis());
}
- /** Add a call to this connection's call queue */
+ /**
+ * Add a call to this connection's call queue and notify
+ * a listener; synchronized.
+ * Returns false if called during shutdown.
+ * @param call to add
+ * @return true if the call was added.
+ */
private synchronized boolean addCall(Call call) {
- if (shouldCloseConnection)
+ if (shouldCloseConnection.get())
return false;
calls.put(call.id, call);
notify();
return true;
}
-
+
/** This class sends a ping to the remote side when timeout on
* reading. If no failure is detected, it retries until at least
* a byte is read.
@@ -233,7 +238,7 @@
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection || !running) {
+ if (shouldCloseConnection.get() || !running.get()) {
throw e;
} else {
sendPing();
@@ -243,6 +248,7 @@
/** Read a byte from the stream.
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
+ * @throws IOException for any IO problem other than socket timeout
*/
public int read() throws IOException {
do {
@@ -258,7 +264,7 @@
* Send a ping if timeout on read. Retries if no failure is detected
* until a byte is read.
*
- * @Return the total number of bytes read; -1 if the connection is closed.
+ * @return the total number of bytes read; -1 if the connection is closed.
*/
public int read(byte[] buf, int off, int len) throws IOException {
do {
@@ -276,7 +282,7 @@
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams() {
- if (socket != null || shouldCloseConnection) {
+ if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -305,19 +311,19 @@
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));
- this.out = new DataOutputStream
- (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+ this.out.set(new DataOutputStream
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
writeHeader();
// update last activity time
touch();
+ // start the receiver thread after the socket connection has been set up
+ start();
} catch (IOException e) {
markClosed(e);
close();
}
- // start the receiver thread after the socket connection has been set up
- start();
}
/* Handle connection failures
@@ -325,7 +331,10 @@
* If the current number of retries is equal to the max number of retries,
* stop retrying and throw the exception; Otherwise backoff 1 second and
* try connecting again.
- *
+ *
+ * This Method is only called from inside setupIOstreams(), which is
+ * synchronized. Hence the sleep is synchronized; the locks will be retained.
+ *
* @param curRetries current number of retries
* @param maxRetries max number of retries allowed
* @param ioe failure reason
@@ -361,6 +370,7 @@
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
+ DataOutputStream out = this.out.get();
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
@@ -379,8 +389,9 @@
* Return true if it is time to read a response; false otherwise.
*/
private synchronized boolean waitForWork() {
- if (calls.isEmpty() && !shouldCloseConnection && running) {
- long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+ if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
+ long timeout = maxIdleTime-
+ (System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {
try {
wait(timeout);
@@ -388,11 +399,11 @@
}
}
- if (!calls.isEmpty() && !shouldCloseConnection && running) {
+ if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
- } else if (shouldCloseConnection) {
+ } else if (shouldCloseConnection.get()) {
return false;
- } else if (!running) { //get stopped
+ } else if (!running.get()) { //get stopped
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
@@ -411,9 +422,10 @@
*/
private synchronized void sendPing() throws IOException {
long curTime = System.currentTimeMillis();
- if ( curTime - lastActivity >= pingInterval) {
- touch(curTime);
+ if ( curTime - lastActivity.get() >= pingInterval) {
+ lastActivity.set(curTime);
synchronized (out) {
+ DataOutputStream out = this.out.get();
out.writeInt(PING_CALL_ID);
out.flush();
}
@@ -441,30 +453,36 @@
* threads.
*/
public void sendParam(Call call) {
- synchronized (this) {
- if (shouldCloseConnection) {
- return;
- }
+ if (shouldCloseConnection.get()) {
+ return;
}
+ DataOutputBuffer d=null;
try {
- synchronized (out) {
+ synchronized (this.out) {
+ DataOutputStream out = this.out.get();
+ if (out==null) return; // socket has closed
+
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
-
- DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+
+ //for serializing the
//data to be written
+ d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
-
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();
}
} catch(IOException e) {
markClosed(e);
+ } finally {
+ //the buffer is just an in-memory buffer, but it is still polite to
+ // close early
+ IOUtils.closeStream(d);
}
}
@@ -472,10 +490,8 @@
* Because only one receiver, so no synchronization on in.
*/
private void receiveResponse() {
- synchronized (this) {
- if (shouldCloseConnection) {
- return;
- }
+ if (shouldCloseConnection.get()) {
+ return;
}
touch();
@@ -502,8 +518,7 @@
}
private synchronized void markClosed(IOException e) {
- if (!shouldCloseConnection) {
- shouldCloseConnection = true;
+ if (shouldCloseConnection.compareAndSet(false, true)) {
closeException = e;
notifyAll();
}
@@ -511,7 +526,7 @@
/** Close the connection. */
private synchronized void close() {
- if (!shouldCloseConnection) {
+ if (!shouldCloseConnection.get()) {
LOG.error("The connection is not in the closed state");
return;
}
@@ -527,15 +542,17 @@
// close the socket and streams
IOUtils.closeStream(in);
- IOUtils.closeStream(out);
+ in = null;
+ IOUtils.closeStream(out.getAndSet(null));
IOUtils.closeSocket(socket);
+ socket = null;
// clean up all calls
if (closeException == null) {
if (!calls.isEmpty()) {
LOG.warn(
"A connection is closed for no cause and calls are not empty");
-
+
// clean up calls anyway
closeException = new IOException("Unexpected closed connection");
cleanupCalls();
@@ -544,7 +561,7 @@
// log the info
if (LOG.isDebugEnabled()) {
LOG.debug("closing ipc connection to " + remoteId.address + ": " +
- StringUtils.stringifyException(closeException));
+ closeException.getMessage(),closeException);
}
// cleanup calls
@@ -643,11 +660,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping client");
}
-
- if (running == false) {
+
+ if (!running.compareAndSet(true, false)) {
return;
}
- running = false;
// wake up all connections
synchronized (connections) {
@@ -716,8 +732,9 @@
Connection connection = getConnection(addresses[i], null, call);
connection.sendParam(call); // send each parameter
} catch (IOException e) {
+ // log errors
LOG.info("Calling "+addresses[i]+" caught: " +
- StringUtils.stringifyException(e)); // log errors
+ e.getMessage(),e);
results.size--; // wait for one fewer result
}
}
@@ -737,11 +754,9 @@
UserGroupInformation ticket,
Call call)
throws IOException {
- synchronized (this) {
- if (!running) {
- // the client is stopped
- throw new IOException("The client is stopped");
- }
+ if (!running.get()) {
+ // the client is stopped
+ throw new IOException("The client is stopped");
}
Connection connection;
/* we could avoid this allocation for each RPC by having a
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Thu Jun 5 22:38:54 2008
@@ -208,6 +208,19 @@
}
}
+ public void testStandAloneClient() throws Exception {
+ testParallel(10, false, 2, 4, 2, 4, 100);
+ Client client = new Client(LongWritable.class, conf);
+ boolean hasException = false;
+ try {
+ client.call(new LongWritable(RANDOM.nextLong()),
+ new InetSocketAddress("127.0.0.1", 10));
+ } catch (IOException e) {
+ hasException = true;
+ }
+ assertTrue (hasException);
+ }
+
public static void main(String[] args) throws Exception {
//new TestIPC("test").testSerial(5, false, 2, 10, 1000);