You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/12/09 06:15:39 UTC
[2/3] accumulo git commit: ACCUMULO-3394 Change thrift package to rpc
and make an rpc package in core too
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 3a435c7..09ae4f4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -20,8 +20,8 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.util.SslConnectionParams;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.security.SystemCredentials;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 593d9b7..f338db8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.wrappers.TraceRunnable;
@@ -59,7 +60,6 @@ import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.StopWatch;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index d392bde..5eea41c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.util.Halt;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
index cd25d49..9822d0f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TabletBalancer.java
@@ -26,11 +26,11 @@ import java.util.SortedMap;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
new file mode 100644
index 0000000..5f630c2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ClientInfoProcessorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.rpc.TBufferedSocket;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
+ */
+public class ClientInfoProcessorFactory extends TProcessorFactory {
+ private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
+
+ private final ThreadLocal<String> clientAddress;
+
+ public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
+ super(processor);
+ this.clientAddress = clientAddress;
+ }
+
+ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ if (trans instanceof TBufferedSocket) {
+ TBufferedSocket tsock = (TBufferedSocket) trans;
+ clientAddress.set(tsock.getClientString());
+ } else if (trans instanceof TSocket) {
+ TSocket tsock = (TSocket) trans;
+ clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
+ } else {
+ log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
+ }
+ return super.getProcessor(trans);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
new file mode 100644
index 0000000..21f55b3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
+ * one that reveals the client address from its transport.
+ *
+ * <p>
+ * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
+ * <ul>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
+ * </ul>
+ *
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
+ * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
+ * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
+ * which has been reviewed and tested.
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+ private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+ private SelectAcceptThread selectAcceptThread_;
+ private volatile boolean stopped_ = false;
+
+ public CustomNonBlockingServer(Args args) {
+ super(args);
+ }
+
+ @Override
+ protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+ TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+ if (trans instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) trans;
+ Socket sock = tsock.getSocketChannel().socket();
+ TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+ }
+ }
+ frameBuffer.invoke();
+ }
+ };
+ }
+
+ @Override
+ protected boolean startThreads() {
+ // start the selector
+ try {
+ selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+ selectAcceptThread_.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to start selector thread!", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped_ = true;
+ if (selectAcceptThread_ != null) {
+ selectAcceptThread_.wakeupSelector();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return selectAcceptThread_.isStopped();
+ }
+
+ @Override
+ protected void joinSelector() {
+ // wait until the selector thread exits
+ try {
+ selectAcceptThread_.join();
+ } catch (InterruptedException e) {
+ // for now, just silently ignore. technically this means we'll have less of
+ // a graceful shutdown as a result.
+ }
+ }
+
+ private interface CustomNonblockingFrameBuffer {
+ TNonblockingTransport getTransport();
+ }
+
+ private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+ private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+ // @formatter:off
+ private class SelectAcceptThread extends AbstractSelectThread {
+
+ // The server transport on which new client transports will be accepted
+ private final TNonblockingServerTransport serverTransport;
+
+ /**
+ * Set up the thread that will handle the non-blocking accepts, reads, and
+ * writes.
+ */
+ public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+ throws IOException {
+ this.serverTransport = serverTransport;
+ serverTransport.registerSelector(selector);
+ }
+
+ public boolean isStopped() {
+ return stopped_;
+ }
+
+ /**
+ * The work loop. Handles both selecting (all IO operations) and managing
+ * the selection preferences of all existing connections.
+ */
+ @Override
+ public void run() {
+ try {
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ stopped_ = true;
+ }
+ }
+
+ /**
+ * Select and process IO events appropriately:
+ * If there are connections to be accepted, accept them.
+ * If there are existing connections with data waiting to be read, read it,
+ * buffering until a whole frame has been read.
+ * If there are any pending responses, buffer them until their target client
+ * is available, and then send the data.
+ */
+ private void select() {
+ try {
+ // wait for io events.
+ selector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ continue;
+ }
+
+ // if the key is marked Accept, then it has to be the server
+ // transport.
+ if (key.isAcceptable()) {
+ handleAccept();
+ } else if (key.isReadable()) {
+ // deal with reads
+ handleRead(key);
+ } else if (key.isWritable()) {
+ // deal with writes
+ handleWrite(key);
+ } else {
+ LOGGER.warn("Unexpected state in select! " + key.interestOps());
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException while selecting!", e);
+ }
+ }
+
+ /**
+ * Accept a new connection.
+ */
+ @SuppressWarnings("unused")
+ private void handleAccept() throws IOException {
+ SelectionKey clientKey = null;
+ TNonblockingTransport client = null;
+ try {
+ // accept the connection
+ client = (TNonblockingTransport)serverTransport.accept();
+ clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+ // add this key to the map
+ FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+ new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+ new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+ clientKey.attach(frameBuffer);
+ } catch (TTransportException tte) {
+ // something went wrong accepting.
+ LOGGER.warn("Exception trying to accept!", tte);
+ tte.printStackTrace();
+ if (clientKey != null) cleanupSelectionKey(clientKey);
+ if (client != null) client.close();
+ }
+ }
+ } // SelectAcceptThread
+ // @formatter:on
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
new file mode 100644
index 0000000..7b34986
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
+import org.apache.accumulo.core.trace.wrappers.TraceWrap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
+ * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
+ * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
+ * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
+ *
+ * @since 1.6.1
+ */
+public class RpcWrapper {
+
+ public static <T> T service(final T instance) {
+ InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
+ @Override
+ public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+ try {
+ return super.invoke(obj, method, args);
+ } catch (RuntimeException e) {
+ String msg = e.getMessage();
+ LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+ throw new TException(msg);
+ } catch (Error e) {
+ String msg = e.getMessage();
+ LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+ throw new TException(msg);
+ }
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
+ return proxiedInstance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
new file mode 100644
index 0000000..b655287
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ServerAddress.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.thrift.server.TServer;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Encapsulate a Thrift server and the address, host and port, to which it is bound.
+ */
+public class ServerAddress {
+ public final TServer server;
+ public final HostAndPort address;
+
+ public ServerAddress(TServer server, HostAndPort address) {
+ this.server = server;
+ this.address = address;
+ }
+
+ public TServer getServer() {
+ return server;
+ }
+
+ public HostAndPort getAddress() {
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
new file mode 100644
index 0000000..2887f48
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TBufferedServerSocket.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.accumulo.core.rpc.TBufferedSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
+public class TBufferedServerSocket extends TServerTransport {
+
+ // expose acceptImpl
+ static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
+ public TServerSocket(ServerSocket serverSocket) {
+ super(serverSocket);
+ }
+
+ public TSocket acceptImplPublic() throws TTransportException {
+ return acceptImpl();
+ }
+ }
+
+ final TServerSocket impl;
+ final int bufferSize;
+
+ public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
+ this.impl = new TServerSocket(serverSocket);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public void listen() throws TTransportException {
+ impl.listen();
+ }
+
+ @Override
+ public void close() {
+ impl.close();
+ }
+
+ // Wrap accepted sockets using buffered IO
+ @Override
+ protected TTransport acceptImpl() throws TTransportException {
+ TSocket sock = impl.acceptImplPublic();
+ try {
+ return new TBufferedSocket(sock, this.bufferSize);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
new file mode 100644
index 0000000..3afe149
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.rpc;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ *
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+ private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+ /**
+ * This channel is where all the nonblocking magic happens.
+ */
+ private ServerSocketChannel serverSocketChannel = null;
+
+ /**
+ * Underlying ServerSocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
+ /**
+ * Timeout for client sockets from accept
+ */
+ private int clientTimeout_ = 0;
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port) throws TTransportException {
+ this(port, 0);
+ }
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+ this(new InetSocketAddress(port), clientTimeout);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+ this(bindAddr, 0);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+ clientTimeout_ = clientTimeout;
+ try {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+
+ // Make server socket
+ serverSocket_ = serverSocketChannel.socket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(bindAddr);
+ } catch (IOException ioe) {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ }
+ }
+
+ public void listen() throws TTransportException {
+ // Make sure not to block on accept
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.setSoTimeout(0);
+ } catch (SocketException sx) {
+ log.error("SocketException caused by serverSocket in listen()", sx);
+ }
+ }
+ }
+
+ protected TNonblockingSocket acceptImpl() throws TTransportException {
+ if (serverSocket_ == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+ }
+ try {
+ SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel == null) {
+ return null;
+ }
+
+ TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+ tsocket.setTimeout(clientTimeout_);
+ return tsocket;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public void registerSelector(Selector selector) {
+ try {
+ // Register the server socket channel, indicating an interest in
+ // accepting new connections
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ // this shouldn't happen, ideally...
+ // TODO: decide what to do with this.
+ }
+ }
+
+ public void close() {
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.close();
+ } catch (IOException iox) {
+ log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+ }
+ serverSocket_ = null;
+ }
+ }
+
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
+ public int getPort() {
+ return serverSocket_.getLocalPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
new file mode 100644
index 0000000..ebf2c84
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+ private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ int portHint = service.getConfiguration().getPort(portHintProperty);
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = service.getConfiguration().getCount(minThreadProperty);
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(address, port);
+ return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
+ service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
+ service.getServerSslParams(), service.getClientTimeoutInMillis());
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ /**
+ * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+ */
+ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
+ final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+ TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+ /*
+ * Create our own very special thread pool.
+ */
+ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
+ // we decrease the core pool size... so the active count could end up higher than
+ // the core pool size, in which case everything will be queued... the increase case
+ // should handle this and prevent deadlock
+ log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory());
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ return new TThreadPoolServer(options);
+ }
+
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+ throws TTransportException {
+ org.apache.thrift.transport.TServerSocket transport;
+ try {
+ transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+ return new ServerAddress(createThreadPoolServer(transport, processor), address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
+ long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+ return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+ */
+ public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
+ int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+
+ ServerAddress serverAddress;
+ if (sslParams != null) {
+ serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
+ } else {
+ serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+ }
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ }
+ }
+ };
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ TServerUtils.log.error("Unable to call shutdownNow", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
new file mode 100644
index 0000000..a842572
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.metrics.MetricsFactory;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem.
+ */
+public class TimedProcessor implements TProcessor {
+ private static final Logger log = LoggerFactory.getLogger(TimedProcessor.class);
+
+ private final TProcessor other;
+ private final Metrics metrics;
+ private long idleStart = 0;
+
+ public TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
+ this.other = next;
+ // Register the metrics MBean
+ MetricsFactory factory = new MetricsFactory(conf);
+ metrics = factory.createThriftMetrics(serverName, threadName);
+ try {
+ metrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ idleStart = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ long now = 0;
+ final boolean metricsEnabled = metrics.isEnabled();
+ if (metricsEnabled) {
+ now = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.idle, (now - idleStart));
+ }
+ try {
+ return other.process(in, out);
+ } finally {
+ if (metricsEnabled) {
+ idleStart = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.execute, idleStart - now);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index e09f7fd..a2afeac 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -44,10 +44,10 @@ import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.security.handler.Authenticator;
import org.apache.accumulo.server.security.handler.Authorizor;
import org.apache.accumulo.server.security.handler.PermissionHandler;
-import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
deleted file mode 100644
index 208fde5..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
- */
-public class ClientInfoProcessorFactory extends TProcessorFactory {
- private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
-
- private final ThreadLocal<String> clientAddress;
-
- public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
- super(processor);
- this.clientAddress = clientAddress;
- }
-
- @Override
- public TProcessor getProcessor(TTransport trans) {
- if (trans instanceof TBufferedSocket) {
- TBufferedSocket tsock = (TBufferedSocket) trans;
- clientAddress.set(tsock.getClientString());
- } else if (trans instanceof TSocket) {
- TSocket tsock = (TSocket) trans;
- clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
- } else {
- log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
- }
- return super.getProcessor(trans);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
deleted file mode 100644
index a96f7b5..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
- * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
- * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
- * one that reveals the client address from its transport.
- *
- * <p>
- * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
- * <ul>
- * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
- * </ul>
- *
- * <p>
- * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
- * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
- * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
- * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
- * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
- * which has been reviewed and tested.
- */
-public class CustomNonBlockingServer extends THsHaServer {
-
- private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
- private SelectAcceptThread selectAcceptThread_;
- private volatile boolean stopped_ = false;
-
- public CustomNonBlockingServer(Args args) {
- super(args);
- }
-
- @Override
- protected Runnable getRunnable(final FrameBuffer frameBuffer) {
- return new Runnable() {
- @Override
- public void run() {
- if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
- TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
- if (trans instanceof TNonblockingSocket) {
- TNonblockingSocket tsock = (TNonblockingSocket) trans;
- Socket sock = tsock.getSocketChannel().socket();
- TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
- }
- }
- frameBuffer.invoke();
- }
- };
- }
-
- @Override
- protected boolean startThreads() {
- // start the selector
- try {
- selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
- selectAcceptThread_.start();
- return true;
- } catch (IOException e) {
- LOGGER.error("Failed to start selector thread!", e);
- return false;
- }
- }
-
- @Override
- public void stop() {
- stopped_ = true;
- if (selectAcceptThread_ != null) {
- selectAcceptThread_.wakeupSelector();
- }
- }
-
- @Override
- public boolean isStopped() {
- return selectAcceptThread_.isStopped();
- }
-
- @Override
- protected void joinSelector() {
- // wait until the selector thread exits
- try {
- selectAcceptThread_.join();
- } catch (InterruptedException e) {
- // for now, just silently ignore. technically this means we'll have less of
- // a graceful shutdown as a result.
- }
- }
-
- private interface CustomNonblockingFrameBuffer {
- TNonblockingTransport getTransport();
- }
-
- private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
- private TNonblockingTransport trans;
-
- public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
- super(trans, selectionKey, selectThread);
- this.trans = trans;
- }
-
- @Override
- public TNonblockingTransport getTransport() {
- return trans;
- }
- }
-
- private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
- private TNonblockingTransport trans;
-
- public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
- super(trans, selectionKey, selectThread);
- this.trans = trans;
- }
-
- @Override
- public TNonblockingTransport getTransport() {
- return trans;
- }
- }
-
- // @formatter:off
- private class SelectAcceptThread extends AbstractSelectThread {
-
- // The server transport on which new client transports will be accepted
- private final TNonblockingServerTransport serverTransport;
-
- /**
- * Set up the thread that will handle the non-blocking accepts, reads, and
- * writes.
- */
- public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
- throws IOException {
- this.serverTransport = serverTransport;
- serverTransport.registerSelector(selector);
- }
-
- public boolean isStopped() {
- return stopped_;
- }
-
- /**
- * The work loop. Handles both selecting (all IO operations) and managing
- * the selection preferences of all existing connections.
- */
- @Override
- public void run() {
- try {
- if (eventHandler_ != null) {
- eventHandler_.preServe();
- }
-
- while (!stopped_) {
- select();
- processInterestChanges();
- }
- for (SelectionKey selectionKey : selector.keys()) {
- cleanupSelectionKey(selectionKey);
- }
- } catch (Throwable t) {
- LOGGER.error("run() exiting due to uncaught error", t);
- } finally {
- stopped_ = true;
- }
- }
-
- /**
- * Select and process IO events appropriately:
- * If there are connections to be accepted, accept them.
- * If there are existing connections with data waiting to be read, read it,
- * buffering until a whole frame has been read.
- * If there are any pending responses, buffer them until their target client
- * is available, and then send the data.
- */
- private void select() {
- try {
- // wait for io events.
- selector.select();
-
- // process the io events we received
- Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
- while (!stopped_ && selectedKeys.hasNext()) {
- SelectionKey key = selectedKeys.next();
- selectedKeys.remove();
-
- // skip if not valid
- if (!key.isValid()) {
- cleanupSelectionKey(key);
- continue;
- }
-
- // if the key is marked Accept, then it has to be the server
- // transport.
- if (key.isAcceptable()) {
- handleAccept();
- } else if (key.isReadable()) {
- // deal with reads
- handleRead(key);
- } else if (key.isWritable()) {
- // deal with writes
- handleWrite(key);
- } else {
- LOGGER.warn("Unexpected state in select! " + key.interestOps());
- }
- }
- } catch (IOException e) {
- LOGGER.warn("Got an IOException while selecting!", e);
- }
- }
-
- /**
- * Accept a new connection.
- */
- @SuppressWarnings("unused")
- private void handleAccept() throws IOException {
- SelectionKey clientKey = null;
- TNonblockingTransport client = null;
- try {
- // accept the connection
- client = (TNonblockingTransport)serverTransport.accept();
- clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
-
- // add this key to the map
- FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
- new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
- new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
-
- clientKey.attach(frameBuffer);
- } catch (TTransportException tte) {
- // something went wrong accepting.
- LOGGER.warn("Exception trying to accept!", tte);
- tte.printStackTrace();
- if (clientKey != null) cleanupSelectionKey(clientKey);
- if (client != null) client.close();
- }
- }
- } // SelectAcceptThread
- // @formatter:on
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
deleted file mode 100644
index 53ed709..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
-import org.apache.accumulo.core.trace.wrappers.TraceWrap;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
- * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
- * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
- * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
- *
- * @since 1.6.1
- */
-public class RpcWrapper {
-
- public static <T> T service(final T instance) {
- InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
- @Override
- public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
- try {
- return super.invoke(obj, method, args);
- } catch (RuntimeException e) {
- String msg = e.getMessage();
- LoggerFactory.getLogger(instance.getClass()).error(msg, e);
- throw new TException(msg);
- } catch (Error e) {
- String msg = e.getMessage();
- LoggerFactory.getLogger(instance.getClass()).error(msg, e);
- throw new TException(msg);
- }
- }
- };
-
- @SuppressWarnings("unchecked")
- T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
- return proxiedInstance;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
deleted file mode 100644
index f52951e..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import org.apache.thrift.server.TServer;
-
-import com.google.common.net.HostAndPort;
-
-/**
- * Encapsulate a Thrift server and the address, host and port, to which it is bound.
- */
-public class ServerAddress {
- public final TServer server;
- public final HostAndPort address;
-
- public ServerAddress(TServer server, HostAndPort address) {
- this.server = server;
- this.address = address;
- }
-
- public TServer getServer() {
- return server;
- }
-
- public HostAndPort getAddress() {
- return address;
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
deleted file mode 100644
index 5534313..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.thrift;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
-public class TBufferedServerSocket extends TServerTransport {
-
- // expose acceptImpl
- static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
- public TServerSocket(ServerSocket serverSocket) {
- super(serverSocket);
- }
-
- public TSocket acceptImplPublic() throws TTransportException {
- return acceptImpl();
- }
- }
-
- final TServerSocket impl;
- final int bufferSize;
-
- public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
- this.impl = new TServerSocket(serverSocket);
- this.bufferSize = bufferSize;
- }
-
- @Override
- public void listen() throws TTransportException {
- impl.listen();
- }
-
- @Override
- public void close() {
- impl.close();
- }
-
- // Wrap accepted sockets using buffered IO
- @Override
- protected TTransport acceptImpl() throws TTransportException {
- TSocket sock = impl.acceptImplPublic();
- try {
- return new TBufferedSocket(sock, this.bufferSize);
- } catch (IOException e) {
- throw new TTransportException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
deleted file mode 100644
index 77c5ca6..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.accumulo.server.thrift;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- * Wrapper around ServerSocketChannel.
- *
- * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
- * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
- */
-public class TNonblockingServerSocket extends TNonblockingServerTransport {
- private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
-
- /**
- * This channel is where all the nonblocking magic happens.
- */
- private ServerSocketChannel serverSocketChannel = null;
-
- /**
- * Underlying ServerSocket object
- */
- private ServerSocket serverSocket_ = null;
-
- /**
- * Timeout for client sockets from accept
- */
- private int clientTimeout_ = 0;
-
- /**
- * Creates just a port listening server socket
- */
- public TNonblockingServerSocket(int port) throws TTransportException {
- this(port, 0);
- }
-
- /**
- * Creates just a port listening server socket
- */
- public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
- this(new InetSocketAddress(port), clientTimeout);
- }
-
- public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
- this(bindAddr, 0);
- }
-
- public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
- clientTimeout_ = clientTimeout;
- try {
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
-
- // Make server socket
- serverSocket_ = serverSocketChannel.socket();
- // Prevent 2MSL delay problem on server restarts
- serverSocket_.setReuseAddress(true);
- // Bind to listening port
- serverSocket_.bind(bindAddr);
- } catch (IOException ioe) {
- serverSocket_ = null;
- throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
- }
- }
-
- public void listen() throws TTransportException {
- // Make sure not to block on accept
- if (serverSocket_ != null) {
- try {
- serverSocket_.setSoTimeout(0);
- } catch (SocketException sx) {
- log.error("SocketException caused by serverSocket in listen()", sx);
- }
- }
- }
-
- protected TNonblockingSocket acceptImpl() throws TTransportException {
- if (serverSocket_ == null) {
- throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
- }
- try {
- SocketChannel socketChannel = serverSocketChannel.accept();
- if (socketChannel == null) {
- return null;
- }
-
- TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
- tsocket.setTimeout(clientTimeout_);
- return tsocket;
- } catch (IOException iox) {
- throw new TTransportException(iox);
- }
- }
-
- public void registerSelector(Selector selector) {
- try {
- // Register the server socket channel, indicating an interest in
- // accepting new connections
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- } catch (ClosedChannelException e) {
- // this shouldn't happen, ideally...
- // TODO: decide what to do with this.
- }
- }
-
- public void close() {
- if (serverSocket_ != null) {
- try {
- serverSocket_.close();
- } catch (IOException iox) {
- log.warn("WARNING: Could not close server socket: " + iox.getMessage());
- }
- serverSocket_ = null;
- }
- }
-
- public void interrupt() {
- // The thread-safeness of this is dubious, but Java documentation suggests
- // that it is safe to do this from a different thread context
- close();
- }
-
- public int getPort() {
- return serverSocket_.getLocalPort();
- }
-}