You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/10/26 23:11:28 UTC
[1/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Repository: hbase
Updated Branches:
refs/heads/0.98 f21501496 -> f321d7ec7
refs/heads/branch-1 c3d4d0233 -> 6fbcd0a2e
refs/heads/branch-1.0 54242171d -> bbb2f1b00
refs/heads/branch-1.1 9b71dac49 -> b75605817
refs/heads/branch-1.2 2f3e98b7e -> c706d42e9
refs/heads/master 928dade1d -> efb82957d
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Conflicts:
hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f321d7ec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f321d7ec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f321d7ec
Branch: refs/heads/0.98
Commit: f321d7ec7f2aecfb443619b1fad1dffbf703c420
Parents: f215014
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 14:52:15 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 10 +++++++--
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hadoop/hbase/security/TestSecureRPC.java | 13 +++++++-----
.../security/token/TestTokenAuthentication.java | 6 +++++-
8 files changed, 78 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 6dfef9c..4317815 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -161,9 +161,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -180,7 +184,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -298,8 +308,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress(), 60000);
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address, 60000);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 142f005..60619bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -91,8 +92,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -134,9 +136,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index c37411b..ceb39f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1760,8 +1760,9 @@ public class RpcServer implements RpcServerInterface {
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1789,8 +1790,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
@@ -2244,11 +2246,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index abe1621..9c2f6ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -90,9 +90,12 @@ public class TestDelayedRpc {
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -171,9 +174,12 @@ public class TestDelayedRpc {
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -294,9 +300,12 @@ public class TestDelayedRpc {
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 7dcc37a..9a36440 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -227,9 +227,12 @@ public class TestIPC {
for (int i = 0; i < count; i++) cells.add(CELL);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r = client.call(md, param, CellUtil.createCellScanner(cells),
md.getOutputType().toProto(), User.getCurrent(), address, 0);
int index = 0;
@@ -322,9 +325,12 @@ public class TestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index 63395af..f03ee19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index 0beb6dc..735a121 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileF
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.isKerberosPropertySetted;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assume.assumeTrue;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -100,10 +100,13 @@ public class TestSecureRPC {
rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
- User.getCurrent(), 1000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
@@ -117,4 +120,4 @@ public class TestSecureRPC {
rpcClient.stop();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f321d7ec/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index b5ae760..5c505df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -137,7 +137,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}
[5/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Posted by ap...@apache.org.
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c706d42e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c706d42e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c706d42e
Branch: refs/heads/branch-1.2
Commit: c706d42e9e260bfb789bd4ada3ab5c2f5bfa0ead
Parents: 2f3e98b
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 15:11:11 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../hbase/regionserver/RSRpcServices.java | 6 +++++-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 22 +++++++++++++++-----
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 9 ++++++++
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hbase/ipc/TestRpcHandlerException.java | 7 +++++--
.../TestRSKilledWhenInitializing.java | 8 +++++--
.../hadoop/hbase/security/TestSecureRPC.java | 9 +++++---
.../security/token/TestTokenAuthentication.java | 6 +++++-
13 files changed, 114 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 09de871..c28f3e6 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 38b7c91..ede4b4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
@@ -96,8 +97,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -143,9 +145,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ac428da..186e2d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1797,8 +1797,9 @@ public class RpcServer implements RpcServerInterface {
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1826,8 +1827,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
@@ -2250,11 +2252,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8822696..47b373a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -890,8 +890,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
// Set our address, however we need the final port that was given to rpcServer
- isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index d427419..dffd8e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -159,10 +159,13 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -200,12 +203,14 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -231,9 +236,12 @@ public abstract class AbstractTestIPC {
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(null, md, param, null, User.getCurrent(), address,
new MetricsConnection.CallStats());
fail("Expected an exception to have been thrown!");
@@ -258,10 +266,14 @@ public abstract class AbstractTestIPC {
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < 10; i++) {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
- md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
+ md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index 8921e89..d9b3e49 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -156,6 +156,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -192,6 +195,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -257,6 +263,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index deee717..41ee4cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -91,9 +91,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -173,9 +176,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -297,9 +303,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 227f91a..d3dbd33 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -125,9 +125,12 @@ public class TestIPC extends AbstractTestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index fc2734f..ffb3927 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index dbf119d..03e9e4e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -179,9 +179,12 @@ public class TestRpcHandlerException {
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController controller =
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
- rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
+ address, new MetricsConnection.CallStats());
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 4ad2c31..9a48db7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing {
@Override
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
if (firstRS.getAndSet(false)) {
+ InetSocketAddress address = super.getRpcServer().getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
- assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
- hostnameFromMasterPOV);
+ assertEquals(address.getHostName(), hostnameFromMasterPOV);
}
}
while (!masterActive) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index b4dd62b..a940408 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -142,11 +142,14 @@ public class TestSecureRPC {
RpcClient rpcClient =
RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
- .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
- 5000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 5000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c706d42e/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 423cd1d..50bc589 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}
[2/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Posted by ap...@apache.org.
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6fbcd0a2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6fbcd0a2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6fbcd0a2
Branch: refs/heads/branch-1
Commit: 6fbcd0a2e882a6e60ae0b658bc6620b282d55350
Parents: c3d4d02
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 15:11:08 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../hbase/regionserver/RSRpcServices.java | 6 +++++-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 22 +++++++++++++++-----
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 9 ++++++++
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hbase/ipc/TestRpcHandlerException.java | 7 +++++--
.../TestRSKilledWhenInitializing.java | 8 +++++--
.../hadoop/hbase/security/TestSecureRPC.java | 9 +++++---
.../security/token/TestTokenAuthentication.java | 6 +++++-
13 files changed, 114 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 09de871..c28f3e6 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 38b7c91..ede4b4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
@@ -96,8 +97,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -143,9 +145,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 2b67817..72ebda2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1797,8 +1797,9 @@ public class RpcServer implements RpcServerInterface {
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1826,8 +1827,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
@@ -2252,11 +2254,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8822696..47b373a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -890,8 +890,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
// Set our address, however we need the final port that was given to rpcServer
- isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index d427419..dffd8e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -159,10 +159,13 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -200,12 +203,14 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -231,9 +236,12 @@ public abstract class AbstractTestIPC {
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(null, md, param, null, User.getCurrent(), address,
new MetricsConnection.CallStats());
fail("Expected an exception to have been thrown!");
@@ -258,10 +266,14 @@ public abstract class AbstractTestIPC {
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < 10; i++) {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
- md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
+ md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index 8921e89..d9b3e49 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -156,6 +156,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -192,6 +195,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -257,6 +263,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index deee717..41ee4cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -91,9 +91,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -173,9 +176,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -297,9 +303,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 227f91a..d3dbd33 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -125,9 +125,12 @@ public class TestIPC extends AbstractTestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index fc2734f..ffb3927 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index dbf119d..03e9e4e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -179,9 +179,12 @@ public class TestRpcHandlerException {
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController controller =
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
- rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
+ address, new MetricsConnection.CallStats());
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 4ad2c31..9a48db7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing {
@Override
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
if (firstRS.getAndSet(false)) {
+ InetSocketAddress address = super.getRpcServer().getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
- assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
- hostnameFromMasterPOV);
+ assertEquals(address.getHostName(), hostnameFromMasterPOV);
}
}
while (!masterActive) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index b4dd62b..a940408 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -142,11 +142,14 @@ public class TestSecureRPC {
RpcClient rpcClient =
RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
- .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
- 5000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 5000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6fbcd0a2/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 423cd1d..50bc589 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}
[6/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Posted by ap...@apache.org.
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/efb82957
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/efb82957
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/efb82957
Branch: refs/heads/master
Commit: efb82957da09ab06f5c887b3d62ad055bbba089f
Parents: 928dade
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:24:42 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 15:11:12 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../hbase/regionserver/RSRpcServices.java | 6 +++++-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 22 +++++++++++++++-----
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 9 ++++++++
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hbase/ipc/TestRpcHandlerException.java | 7 +++++--
.../TestRSKilledWhenInitializing.java | 8 +++++--
.../hadoop/hbase/security/TestSecureRPC.java | 9 +++++---
.../security/token/TestTokenAuthentication.java | 6 +++++-
13 files changed, 114 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 09de871..c28f3e6 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 38b7c91..ede4b4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.commons.logging.Log;
@@ -96,8 +97,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -143,9 +145,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 8c08635..c20e972 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1813,8 +1813,9 @@ public class RpcServer implements RpcServerInterface {
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1842,8 +1843,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
@@ -2266,11 +2268,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 38288ef..28bf069 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -972,8 +972,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
// Set our address, however we need the final port that was given to rpcServer
- isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index d427419..dffd8e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -159,10 +159,13 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -200,12 +203,14 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
@@ -231,9 +236,12 @@ public abstract class AbstractTestIPC {
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(null, md, param, null, User.getCurrent(), address,
new MetricsConnection.CallStats());
fail("Expected an exception to have been thrown!");
@@ -258,10 +266,14 @@ public abstract class AbstractTestIPC {
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < 10; i++) {
client.call(new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
- md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
+ md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index d761ae9..b9d390a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -157,6 +157,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -193,6 +196,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -258,6 +264,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index 961001f..d379722 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -92,9 +92,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -174,9 +177,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -298,9 +304,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index d1b8202..3fc1259 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -122,9 +122,12 @@ public class TestIPC extends AbstractTestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index cee459f..81869b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -101,7 +101,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index a4e55d9..a37ba11 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -180,9 +180,12 @@ public class TestRpcHandlerException {
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController controller =
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
- rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
+ address, new MetricsConnection.CallStats());
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 97e69b7..a3ac177 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -115,13 +116,16 @@ public class TestRSKilledWhenInitializing {
@Override
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
if (firstRS.getAndSet(false)) {
+ InetSocketAddress address = super.getRpcServer().getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
- assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
- hostnameFromMasterPOV);
+ assertEquals(address.getHostName(), hostnameFromMasterPOV);
}
}
while (!masterActive) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index 8eff063..66b8c75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -143,11 +143,14 @@ public class TestSecureRPC {
RpcClient rpcClient =
RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
- .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
- 5000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 5000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/efb82957/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index c83e502..69c6e63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}
[3/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Posted by ap...@apache.org.
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bbb2f1b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bbb2f1b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bbb2f1b0
Branch: refs/heads/branch-1.0
Commit: bbb2f1b0035a9e659c9b7bd7b10515a66881c2ca
Parents: 5424217
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 15:11:09 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../hbase/regionserver/RSRpcServices.java | 6 +++++-
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hbase/ipc/TestRpcHandlerException.java | 6 +++++-
.../TestRSKilledWhenInitializing.java | 8 +++++--
.../hadoop/hbase/security/TestSecureRPC.java | 11 ++++++----
.../security/token/TestTokenAuthentication.java | 6 +++++-
11 files changed, 89 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 70538b9..c790b4d 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -162,9 +162,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -181,7 +185,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -299,8 +309,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index ef6fa88..51e3fba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hbase.CellScanner;
@@ -95,8 +96,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -143,9 +145,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a22a15d..48ca2e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1730,8 +1730,9 @@ public class RpcServer implements RpcServerInterface {
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1759,8 +1760,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
// probably the hbase hadoop version does not match the running hadoop version
@@ -2165,11 +2167,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5a18db1..da69595 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -813,8 +813,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
// Set our address, however we need the final port that was given to rpcServer
- isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index deee717..41ee4cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -91,9 +91,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -173,9 +176,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -297,9 +303,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 2c70eb4..1ecb200 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -321,9 +321,12 @@ public class TestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index fc2734f..ffb3927 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 9cb1cc5..193a217 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -177,8 +177,12 @@ public class TestRpcHandlerException {
rpcServer.start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md
- .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
+ .getOutputType().toProto(), User.getCurrent(), address, 0);
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 4ad2c31..9a48db7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing {
@Override
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
if (firstRS.getAndSet(false)) {
+ InetSocketAddress address = super.getRpcServer().getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
- assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
- hostnameFromMasterPOV);
+ assertEquals(address.getHostName(), hostnameFromMasterPOV);
}
}
while (!masterActive) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index a3cae76..e5b7996 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileF
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.isKerberosPropertySetted;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assume.assumeTrue;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -102,10 +102,13 @@ public class TestSecureRPC {
RpcClient rpcClient = RpcClientFactory
.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
- User.getCurrent(), 1000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbb2f1b0/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index 40df515..bae7a5f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}
[4/6] hbase git commit: HBASE-13318 RpcServer.getListenerAddress
should handle when the accept channel is closed
Posted by ap...@apache.org.
HBASE-13318 RpcServer.getListenerAddress should handle when the accept channel is closed
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7560581
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7560581
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7560581
Branch: refs/heads/branch-1.1
Commit: b7560581752b477df6f824382461d5fb8fc41fd2
Parents: 9b71dac
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Oct 26 14:31:28 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Oct 26 15:11:10 2015 -0700
----------------------------------------------------------------------
.../hbase/ipc/IntegrationTestRpcClient.java | 22 ++++++++++++++++----
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 11 ++++++----
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++-----
.../hbase/regionserver/RSRpcServices.java | 6 +++++-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 22 +++++++++++++++-----
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 9 ++++++++
.../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 21 +++++++++++++------
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 5 ++++-
.../hadoop/hbase/ipc/TestProtoBufRpc.java | 6 +++++-
.../hbase/ipc/TestRpcHandlerException.java | 7 +++++--
.../TestRSKilledWhenInitializing.java | 8 +++++--
.../hadoop/hbase/security/TestSecureRPC.java | 9 +++++---
.../security/token/TestTokenAuthentication.java | 6 +++++-
13 files changed, 114 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index a99df88..1b425b8 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -168,9 +168,13 @@ public class IntegrationTestRpcClient {
TestRpcServer rpcServer = new TestRpcServer(conf);
rpcServer.start();
- rpcServers.put(rpcServer.getListenerAddress(), rpcServer);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.put(address, rpcServer);
serverList.add(rpcServer);
- LOG.info("Started server: " + rpcServer.getListenerAddress());
+ LOG.info("Started server: " + address);
return rpcServer;
} finally {
lock.writeLock().unlock();
@@ -187,7 +191,13 @@ public class IntegrationTestRpcClient {
int size = rpcServers.size();
int rand = random.nextInt(size);
rpcServer = serverList.remove(rand);
- rpcServers.remove(rpcServer.getListenerAddress());
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ // Throw exception here. We can't remove this instance from the server map because
+ // we no longer have access to its map key
+ throw new IOException("Listener channel is closed");
+ }
+ rpcServers.remove(address);
if (rpcServer != null) {
stopServer(rpcServer);
@@ -305,8 +315,12 @@ public class IntegrationTestRpcClient {
TestRpcServer server = cluster.getRandomServer();
try {
User user = User.getCurrent();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
ret = (EchoResponseProto)
- rpcClient.callBlockingMethod(md, null, param, ret, user, server.getListenerAddress());
+ rpcClient.callBlockingMethod(md, null, param, ret, user, address);
} catch (Exception e) {
LOG.warn(e);
continue; // expected in case connection is closing or closed
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e2274e9..e329ef0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hbase.ipc;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hbase.CellScanner;
@@ -91,8 +92,9 @@ public class CallRunner {
TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
- throw new ServerNotRunningYetException("Server " + rpcServer.getListenerAddress()
- + " is not running yet");
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ throw new ServerNotRunningYetException("Server " +
+ (address != null ? address : "(channel closed)") + " is not running yet");
}
if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
@@ -134,9 +136,10 @@ public class CallRunner {
throw e;
}
} catch (ClosedChannelException cce) {
+ InetSocketAddress address = rpcServer.getListenerAddress();
RpcServer.LOG.warn(Thread.currentThread().getName() + ": caught a ClosedChannelException, " +
- "this means that the server " + rpcServer.getListenerAddress() + " was processing a " +
- "request but the client went away. The error message was: " +
+ "this means that the server " + (address != null ? address : "(channel closed)") +
+ " was processing a request but the client went away. The error message was: " +
cce.getMessage());
} catch (Exception e) {
RpcServer.LOG.warn(Thread.currentThread().getName()
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 47d9825..e3ec22f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1791,8 +1791,9 @@ public class RpcServer implements RpcServerInterface {
responder, totalRequestSize, null, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
+ InetSocketAddress address = getListenerAddress();
setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + getListenerAddress() +
+ "Call queue is full on " + (address != null ? address : "(channel closed)") +
", is hbase.ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
@@ -1820,8 +1821,9 @@ public class RpcServer implements RpcServerInterface {
buf, offset, buf.length);
}
} catch (Throwable t) {
- String msg = getListenerAddress() + " is unable to read call parameter from client " +
- getHostAddress();
+ InetSocketAddress address = getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t);
metrics.exception(t);
@@ -2241,11 +2243,16 @@ public class RpcServer implements RpcServerInterface {
}
/**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
+ * Return the socket (ip+port) on which the RPC server is listening to. May return null if
+ * the listener channel is closed.
+ * @return the socket (ip+port) on which the RPC server is listening to, or null if this
+ * information cannot be determined
*/
@Override
public synchronized InetSocketAddress getListenerAddress() {
+ if (listener == null) {
+ return null;
+ }
return listener.getAddress();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 61fb87b..ec4f88e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -878,8 +878,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
// Set our address, however we need the final port that was given to rpcServer
- isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
+ isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
rpcServer.setErrorHandler(this);
rs.setName(name);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 32eb9f6..528939d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -158,10 +158,13 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
assertTrue(r.getSecond() == null);
@@ -198,12 +201,14 @@ public abstract class AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
Pair<Message, CellScanner> r =
client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
int index = 0;
@@ -228,9 +233,12 @@ public abstract class AbstractTestIPC {
AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(null, md, param, null, User.getCurrent(), address);
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
@@ -254,11 +262,15 @@ public abstract class AbstractTestIPC {
verify(scheduler).start();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < 10; i++) {
client.call(
new PayloadCarryingRpcController(
CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
- .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress());
+ .getOutputType().toProto(), User.getCurrent(), address);
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index ca7c9a7..891acc3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -157,6 +157,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -193,6 +196,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -258,6 +264,9 @@ public class TestAsyncIPC extends AbstractTestIPC {
try {
rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
for (int i = 0; i < cycles; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
index deee717..41ee4cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
@@ -91,9 +91,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -173,9 +176,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), RPC_CLIENT_TIMEOUT);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
@@ -297,9 +303,12 @@ public class TestDelayedRpc {
RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
- rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()),
+ ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
User.getCurrent(), 1000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index af10058..6975c6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -124,9 +124,12 @@ public class TestIPC extends AbstractTestIPC {
rm.add(p);
try {
rpcServer.start();
- InetSocketAddress address = rpcServer.getListenerAddress();
long startTime = System.currentTimeMillis();
User user = User.getCurrent();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (int i = 0; i < cycles; i++) {
List<CellScannable> cells = new ArrayList<CellScannable>();
// Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
index fc2734f..ffb3927 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
@@ -100,7 +100,11 @@ public class TestProtoBufRpc {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
new InetSocketAddress(ADDRESS, PORT), conf,
new FifoRpcScheduler(conf, 10));
- this.isa = server.getListenerAddress();
+ InetSocketAddress address = server.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.server.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 2c21ebd..c72313d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -178,9 +178,12 @@ public class TestRpcHandlerException {
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
PayloadCarryingRpcController controller =
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
-
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
- rpcServer.getListenerAddress());
+ address);
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
index 4ad2c31..9a48db7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,13 +115,16 @@ public class TestRSKilledWhenInitializing {
@Override
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
if (firstRS.getAndSet(false)) {
+ InetSocketAddress address = super.getRpcServer().getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
for (NameStringPair e : c.getMapEntriesList()) {
String key = e.getName();
// The hostname the master sees us as.
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
String hostnameFromMasterPOV = e.getValue();
- assertEquals(super.getRpcServer().getListenerAddress().getHostName(),
- hostnameFromMasterPOV);
+ assertEquals(address.getHostName(), hostnameFromMasterPOV);
}
}
while (!masterActive) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
index b4dd62b..a940408 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureRPC.java
@@ -142,11 +142,14 @@ public class TestSecureRPC {
RpcClient rpcClient =
RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try {
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
BlockingRpcChannel channel =
rpcClient.createBlockingRpcChannel(
- ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer
- .getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(),
- 5000);
+ ServerName.valueOf(address.getHostName(), address.getPort(),
+ System.currentTimeMillis()), User.getCurrent(), 5000);
TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
List<Integer> results = new ArrayList<Integer>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7560581/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
index b0eb3aa..e068a08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
@@ -141,7 +141,11 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface.class));
this.rpcServer =
new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
- this.isa = this.rpcServer.getListenerAddress();
+ InetSocketAddress address = rpcServer.getListenerAddress();
+ if (address == null) {
+ throw new IOException("Listener channel is closed");
+ }
+ this.isa = address;
this.sleeper = new Sleeper(1000, this);
}