You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/12/09 04:11:54 UTC
[1/5] accumulo git commit: ACCUMULO-3394 Update some documentation.
Repository: accumulo
Updated Branches:
refs/heads/master ab4cc79ac -> b7927906f
ACCUMULO-3394 Update some documentation.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79fa912f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79fa912f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79fa912f
Branch: refs/heads/master
Commit: 79fa912fc1c9b2b4d255ab5d3d9f7ba9d9c3e971
Parents: 6e368ed
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 8 21:17:54 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 8 22:04:17 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/thrift/TServerUtils.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/79fa912f/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
index 4e6a758..123caab 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
@@ -123,6 +123,9 @@ public class TServerUtils {
throw new UnknownHostException("Unable to find a listen port");
}
+ /**
+ * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+ */
public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
[2/5] accumulo git commit: ACCUMULO-3394 Remove unused method.
Posted by el...@apache.org.
ACCUMULO-3394 Remove unused method.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e368edb
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e368edb
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e368edb
Branch: refs/heads/master
Commit: 6e368edb8cd5eb5a68e75f45352a8fdfacaedc1e
Parents: f25d850
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 8 21:17:41 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 8 22:04:17 2014 -0500
----------------------------------------------------------------------
.../accumulo/server/thrift/TServerUtils.java | 20 --------------------
1 file changed, 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e368edb/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
index ca182d4..4e6a758 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
@@ -16,14 +16,11 @@
*/
package org.apache.accumulo.server.thrift;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -170,23 +167,6 @@ public class TServerUtils {
return new ServerAddress(new CustomNonBlockingServer(options), address);
}
- public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
- throws TTransportException {
-
- // if port is zero, then we must bind to get the port number
- ServerSocket sock;
- try {
- sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
- address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
- } catch (IOException ex) {
- throw new TTransportException(ex);
- }
- TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
- return new ServerAddress(createThreadPoolServer(transport, processor), address);
- }
-
public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
options.protocolFactory(ThriftUtil.protocolFactory());
[3/5] accumulo git commit: ACCUMULO-3394 Consolidate thrift related
classes in one package.
Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 371c94d..bbb6306 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -92,9 +92,9 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.fs.VolumeUtil;
import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.thrift.RpcWrapper;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.RpcWrapper;
-import org.apache.accumulo.server.util.TServerUtils;
import org.apache.accumulo.server.util.TabletIterator;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.hadoop.fs.FileStatus;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index de00041..9f6a2c0 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -122,12 +122,12 @@ import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.handler.ZKPermHandler;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.tables.TableObserver;
+import org.apache.accumulo.server.thrift.RpcWrapper;
+import org.apache.accumulo.server.thrift.ServerAddress;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.accumulo.server.util.DefaultMap;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.util.RpcWrapper;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.ZooLock;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c45d5cd..d05f8d1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -173,13 +173,13 @@ import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.thrift.RpcWrapper;
+import org.apache.accumulo.server.thrift.ServerAddress;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.accumulo.server.util.FileSystemMonitor;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MasterMetadataUtil;
import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.util.RpcWrapper;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
import org.apache.accumulo.server.util.time.RelativeTime;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 8f85fc2..3762a83 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -17,7 +17,7 @@
package org.apache.accumulo.tserver.session;
import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.thrift.TServerUtils;
public class Session {
public final String client;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 4094c5f..8e14bf2 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -40,8 +40,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
+import org.apache.accumulo.server.thrift.ServerAddress;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -54,23 +54,23 @@ import com.google.common.net.HostAndPort;
* Tablet server that creates a lock in zookeeper, responds to one status request, and then hangs on subsequent requests. Exits with code zero if halted.
*/
public class ZombieTServer {
-
+
public static class ThriftClientHandler extends org.apache.accumulo.test.performance.thrift.NullTserver.ThriftClientHandler {
-
+
int statusCount = 0;
-
+
boolean halted = false;
ThriftClientHandler(AccumuloServerContext context, TransactionWatcher watcher) {
super(context, watcher);
}
-
+
@Override
synchronized public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {
halted = true;
notifyAll();
}
-
+
@Override
public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
synchronized (this) {
@@ -83,35 +83,35 @@ public class ZombieTServer {
UtilWaitThread.sleep(Integer.MAX_VALUE);
return null;
}
-
+
@Override
synchronized public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException, TException {
halted = true;
notifyAll();
}
-
+
}
private static final Logger log = Logger.getLogger(ZombieTServer.class);
-
+
public static void main(String[] args) throws Exception {
Random random = new Random(System.currentTimeMillis() % 1000);
int port = random.nextInt(30000) + 2000;
AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
-
+
TransactionWatcher watcher = new TransactionWatcher();
final ThriftClientHandler tch = new ThriftClientHandler(context, watcher);
Processor<Iface> processor = new Processor<Iface>(tch);
ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000,
10 * 1024 * 1024, null, -1);
-
+
String addressString = serverPort.address.toString();
String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString;
ZooReaderWriter zoo = ZooReaderWriter.getInstance();
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
-
+
ZooLock zlock = new ZooLock(zPath);
-
+
LockWatcher lw = new LockWatcher() {
@Override
public void lostLock(final LockLossReason reason) {
@@ -122,7 +122,7 @@ public class ZombieTServer {
System.exit(1);
}
}
-
+
@Override
public void unableToMonitorLockNode(Throwable e) {
try {
@@ -133,7 +133,7 @@ public class ZombieTServer {
}
}
};
-
+
byte[] lockContent = new ServerServices(addressString, Service.TSERV_CLIENT).toString().getBytes(UTF_8);
if (zlock.tryLock(lw, lockContent)) {
log.debug("Obtained tablet server lock " + zlock.getLockPath());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 671ead6..b311358 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -69,7 +69,7 @@ import org.apache.accumulo.server.master.state.MetaDataStateStore;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
[4/5] accumulo git commit: ACCUMULO-3394 Consolidate thrift related
classes in one package.
Posted by el...@apache.org.
ACCUMULO-3394 Consolidate thrift related classes in one package.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f25d8505
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f25d8505
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f25d8505
Branch: refs/heads/master
Commit: f25d850561c4da093222621ae0ea5f1ab8079dd0
Parents: ab4cc79
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 8 17:34:32 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 8 22:04:17 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/accumulo/proxy/Proxy.java | 2 +-
.../security/AuditedSecurityOperation.java | 2 +-
.../thrift/ClientInfoProcessorFactory.java | 53 +++
.../server/thrift/CustomNonBlockingServer.java | 268 +++++++++++++++
.../accumulo/server/thrift/RpcWrapper.java | 62 ++++
.../accumulo/server/thrift/ServerAddress.java | 42 +++
.../server/thrift/TBufferedServerSocket.java | 71 ++++
.../server/thrift/TNonblockingServerSocket.java | 157 +++++++++
.../accumulo/server/thrift/TServerUtils.java | 272 +++++++++++++++
.../accumulo/server/thrift/TimedProcessor.java | 69 ++++
.../server/util/CustomNonBlockingServer.java | 268 ---------------
.../apache/accumulo/server/util/RpcWrapper.java | 62 ----
.../server/util/TBufferedServerSocket.java | 71 ----
.../server/util/TNonblockingServerSocket.java | 157 ---------
.../accumulo/server/util/TServerUtils.java | 342 -------------------
.../accumulo/server/util/TServerUtilsTest.java | 3 +
.../accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../java/org/apache/accumulo/master/Master.java | 6 +-
.../apache/accumulo/tserver/TabletServer.java | 6 +-
.../accumulo/tserver/session/Session.java | 2 +-
.../accumulo/test/functional/ZombieTServer.java | 32 +-
.../test/performance/thrift/NullTserver.java | 2 +-
22 files changed, 1025 insertions(+), 928 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 972eee7..0a7b301 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -27,7 +27,7 @@ import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
-import org.apache.accumulo.server.util.RpcWrapper;
+import org.apache.accumulo.server.thrift.RpcWrapper;
import org.apache.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index e473822..e09f7fd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -47,7 +47,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.security.handler.Authenticator;
import org.apache.accumulo.server.security.handler.Authorizor;
import org.apache.accumulo.server.security.handler.PermissionHandler;
-import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
new file mode 100644
index 0000000..208fde5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/ClientInfoProcessorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sets the address of a client in a ThreadLocal to allow for more informative log messages.
+ */
+public class ClientInfoProcessorFactory extends TProcessorFactory {
+ private static final Logger log = LoggerFactory.getLogger(ClientInfoProcessorFactory.class);
+
+ private final ThreadLocal<String> clientAddress;
+
+ public ClientInfoProcessorFactory(ThreadLocal<String> clientAddress, TProcessor processor) {
+ super(processor);
+ this.clientAddress = clientAddress;
+ }
+
+ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ if (trans instanceof TBufferedSocket) {
+ TBufferedSocket tsock = (TBufferedSocket) trans;
+ clientAddress.set(tsock.getClientString());
+ } else if (trans instanceof TSocket) {
+ TSocket tsock = (TSocket) trans;
+ clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
+ } else {
+ log.warn("Unable to extract clientAddress from transport of type {}", trans.getClass());
+ }
+ return super.getProcessor(trans);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
new file mode 100644
index 0000000..ceb0a42
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
+ * one that reveals the client address from its transport.
+ *
+ * <p>
+ * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
+ * <ul>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
+ * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
+ * </ul>
+ *
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
+ * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
+ * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
+ * which has been reviewed and tested.
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+ private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+ private SelectAcceptThread selectAcceptThread_;
+ private volatile boolean stopped_ = false;
+
+ public CustomNonBlockingServer(Args args) {
+ super(args);
+ }
+
+ @Override
+ protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+ TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+ if (trans instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) trans;
+ Socket sock = tsock.getSocketChannel().socket();
+ TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+ }
+ }
+ frameBuffer.invoke();
+ }
+ };
+ }
+
+ @Override
+ protected boolean startThreads() {
+ // start the selector
+ try {
+ selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+ selectAcceptThread_.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to start selector thread!", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped_ = true;
+ if (selectAcceptThread_ != null) {
+ selectAcceptThread_.wakeupSelector();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return selectAcceptThread_.isStopped();
+ }
+
+ @Override
+ protected void joinSelector() {
+ // wait until the selector thread exits
+ try {
+ selectAcceptThread_.join();
+ } catch (InterruptedException e) {
+ // for now, just silently ignore. technically this means we'll have less of
+ // a graceful shutdown as a result.
+ }
+ }
+
+ private interface CustomNonblockingFrameBuffer {
+ TNonblockingTransport getTransport();
+ }
+
+ private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+ private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+ // @formatter:off
+ private class SelectAcceptThread extends AbstractSelectThread {
+
+ // The server transport on which new client transports will be accepted
+ private final TNonblockingServerTransport serverTransport;
+
+ /**
+ * Set up the thread that will handle the non-blocking accepts, reads, and
+ * writes.
+ */
+ public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+ throws IOException {
+ this.serverTransport = serverTransport;
+ serverTransport.registerSelector(selector);
+ }
+
+ public boolean isStopped() {
+ return stopped_;
+ }
+
+ /**
+ * The work loop. Handles both selecting (all IO operations) and managing
+ * the selection preferences of all existing connections.
+ */
+ @Override
+ public void run() {
+ try {
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ stopped_ = true;
+ }
+ }
+
+ /**
+ * Select and process IO events appropriately:
+ * If there are connections to be accepted, accept them.
+ * If there are existing connections with data waiting to be read, read it,
+ * buffering until a whole frame has been read.
+ * If there are any pending responses, buffer them until their target client
+ * is available, and then send the data.
+ */
+ private void select() {
+ try {
+ // wait for io events.
+ selector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ continue;
+ }
+
+ // if the key is marked Accept, then it has to be the server
+ // transport.
+ if (key.isAcceptable()) {
+ handleAccept();
+ } else if (key.isReadable()) {
+ // deal with reads
+ handleRead(key);
+ } else if (key.isWritable()) {
+ // deal with writes
+ handleWrite(key);
+ } else {
+ LOGGER.warn("Unexpected state in select! " + key.interestOps());
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException while selecting!", e);
+ }
+ }
+
+ /**
+ * Accept a new connection.
+ */
+ @SuppressWarnings("unused")
+ private void handleAccept() throws IOException {
+ SelectionKey clientKey = null;
+ TNonblockingTransport client = null;
+ try {
+ // accept the connection
+ client = (TNonblockingTransport)serverTransport.accept();
+ clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+ // add this key to the map
+ FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+ new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+ new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+ clientKey.attach(frameBuffer);
+ } catch (TTransportException tte) {
+ // something went wrong accepting.
+ LOGGER.warn("Exception trying to accept!", tte);
+ tte.printStackTrace();
+ if (clientKey != null) cleanupSelectionKey(clientKey);
+ if (client != null) client.close();
+ }
+ }
+ } // SelectAcceptThread
+ // @formatter:on
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
new file mode 100644
index 0000000..db1863b
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
+import org.apache.accumulo.core.trace.wrappers.TraceWrap;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
+ * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
+ * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
+ * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
+ *
+ * @since 1.6.1
+ */
+public class RpcWrapper {
+
+ public static <T> T service(final T instance) {
+ InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
+ @Override
+ public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+ try {
+ return super.invoke(obj, method, args);
+ } catch (RuntimeException e) {
+ String msg = e.getMessage();
+ LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+ throw new TException(msg);
+ } catch (Error e) {
+ String msg = e.getMessage();
+ LoggerFactory.getLogger(instance.getClass()).error(msg, e);
+ throw new TException(msg);
+ }
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
+ return proxiedInstance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
new file mode 100644
index 0000000..f52951e
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/ServerAddress.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.thrift.server.TServer;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Encapsulate a Thrift server and the address, host and port, to which it is bound.
+ */
+public class ServerAddress {
+ public final TServer server;
+ public final HostAndPort address;
+
+ public ServerAddress(TServer server, HostAndPort address) {
+ this.server = server;
+ this.address = address;
+ }
+
+ public TServer getServer() {
+ return server;
+ }
+
+ public HostAndPort getAddress() {
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
new file mode 100644
index 0000000..bf35bdf
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
+public class TBufferedServerSocket extends TServerTransport {
+
+ // expose acceptImpl
+ static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
+ public TServerSocket(ServerSocket serverSocket) {
+ super(serverSocket);
+ }
+
+ public TSocket acceptImplPublic() throws TTransportException {
+ return acceptImpl();
+ }
+ }
+
+ final TServerSocket impl;
+ final int bufferSize;
+
+ public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
+ this.impl = new TServerSocket(serverSocket);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public void listen() throws TTransportException {
+ impl.listen();
+ }
+
+ @Override
+ public void close() {
+ impl.close();
+ }
+
+ // Wrap accepted sockets using buffered IO
+ @Override
+ protected TTransport acceptImpl() throws TTransportException {
+ TSocket sock = impl.acceptImplPublic();
+ try {
+ return new TBufferedSocket(sock, this.bufferSize);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
new file mode 100644
index 0000000..f4b2df4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.thrift;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ *
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+ private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+ /**
+ * This channel is where all the nonblocking magic happens.
+ */
+ private ServerSocketChannel serverSocketChannel = null;
+
+ /**
+ * Underlying ServerSocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
+ /**
+ * Timeout for client sockets from accept
+ */
+ private int clientTimeout_ = 0;
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port) throws TTransportException {
+ this(port, 0);
+ }
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+ this(new InetSocketAddress(port), clientTimeout);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
+ this(bindAddr, 0);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
+ clientTimeout_ = clientTimeout;
+ try {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+
+ // Make server socket
+ serverSocket_ = serverSocketChannel.socket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(bindAddr);
+ } catch (IOException ioe) {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ }
+ }
+
+ public void listen() throws TTransportException {
+ // Make sure not to block on accept
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.setSoTimeout(0);
+ } catch (SocketException sx) {
+ log.error("SocketException caused by serverSocket in listen()", sx);
+ }
+ }
+ }
+
+ protected TNonblockingSocket acceptImpl() throws TTransportException {
+ if (serverSocket_ == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+ }
+ try {
+ SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel == null) {
+ return null;
+ }
+
+ TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+ tsocket.setTimeout(clientTimeout_);
+ return tsocket;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public void registerSelector(Selector selector) {
+ try {
+ // Register the server socket channel, indicating an interest in
+ // accepting new connections
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ // this shouldn't happen, ideally...
+ // TODO: decide what to do with this.
+ }
+ }
+
+ public void close() {
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.close();
+ } catch (IOException iox) {
+ log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+ }
+ serverSocket_ = null;
+ }
+ }
+
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
+ public int getPort() {
+ return serverSocket_.getLocalPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
new file mode 100644
index 0000000..ca182d4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TServerUtils.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.SslConnectionParams;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+ private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ int portHint = service.getConfiguration().getPort(portHintProperty);
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = service.getConfiguration().getCount(minThreadProperty);
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(address, port);
+ return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
+ service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
+ service.getServerSslParams(), service.getClientTimeoutInMillis());
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
+ final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+ TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+ /*
+ * Create our own very special thread pool.
+ */
+ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
+ // we decrease the core pool size... so the active count could end up higher than
+ // the core pool size, in which case everything will be queued... the increase case
+ // should handle this and prevent deadlock
+ log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
+ throws TTransportException {
+
+ // if port is zero, then we must bind to get the port number
+ ServerSocket sock;
+ try {
+ sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
+ address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
+ return new ServerAddress(createThreadPoolServer(transport, processor), address);
+ }
+
+ public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory());
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ return new TThreadPoolServer(options);
+ }
+
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+ throws TTransportException {
+ org.apache.thrift.transport.TServerSocket transport;
+ try {
+ transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+ return new ServerAddress(createThreadPoolServer(transport, processor), address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
+ long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+ return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+ */
+ public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
+ int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+
+ ServerAddress serverAddress;
+ if (sslParams != null) {
+ serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
+ } else {
+ serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+ }
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ }
+ }
+ };
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ TServerUtils.log.error("Unable to call shutdownNow", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
new file mode 100644
index 0000000..56c235c
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TimedProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.thrift;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.metrics.Metrics;
+import org.apache.accumulo.server.metrics.MetricsFactory;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TProcessor} which tracks the duration of an RPC and adds it to the metrics subsystem.
+ */
+public class TimedProcessor implements TProcessor {
+ private static final Logger log = LoggerFactory.getLogger(TimedProcessor.class);
+
+ private final TProcessor other;
+ private final Metrics metrics;
+ private long idleStart = 0;
+
+ public TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
+ this.other = next;
+ // Register the metrics MBean
+ MetricsFactory factory = new MetricsFactory(conf);
+ metrics = factory.createThriftMetrics(serverName, threadName);
+ try {
+ metrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ idleStart = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ long now = 0;
+ final boolean metricsEnabled = metrics.isEnabled();
+ if (metricsEnabled) {
+ now = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.idle, (now - idleStart));
+ }
+ try {
+ return other.process(in, out);
+ } finally {
+ if (metricsEnabled) {
+ idleStart = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.execute, idleStart - now);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
deleted file mode 100644
index 0f01068..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
- * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
- * {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
- * one that reveals the client address from its transport.
- *
- * <p>
- * The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
- * <ul>
- * <li>https://issues.apache.org/jira/browse/THRIFT-958</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
- * <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
- * </ul>
- *
- * <p>
- * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
- * instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
- * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}. Because of this, any change in the implementation upstream will require a review
- * of this implementation here, to ensure any new bugfixes/features in the upstream Thrift class are also applied here, at least until
- * https://issues.apache.org/jira/browse/THRIFT-2173 is implemented. In the meantime, the maven-enforcer-plugin ensures that Thrift remains at version 0.9.1,
- * which has been reviewed and tested.
- */
-public class CustomNonBlockingServer extends THsHaServer {
-
- private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
- private SelectAcceptThread selectAcceptThread_;
- private volatile boolean stopped_ = false;
-
- public CustomNonBlockingServer(Args args) {
- super(args);
- }
-
- @Override
- protected Runnable getRunnable(final FrameBuffer frameBuffer) {
- return new Runnable() {
- @Override
- public void run() {
- if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
- TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
- if (trans instanceof TNonblockingSocket) {
- TNonblockingSocket tsock = (TNonblockingSocket) trans;
- Socket sock = tsock.getSocketChannel().socket();
- TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
- }
- }
- frameBuffer.invoke();
- }
- };
- }
-
- @Override
- protected boolean startThreads() {
- // start the selector
- try {
- selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
- selectAcceptThread_.start();
- return true;
- } catch (IOException e) {
- LOGGER.error("Failed to start selector thread!", e);
- return false;
- }
- }
-
- @Override
- public void stop() {
- stopped_ = true;
- if (selectAcceptThread_ != null) {
- selectAcceptThread_.wakeupSelector();
- }
- }
-
- @Override
- public boolean isStopped() {
- return selectAcceptThread_.isStopped();
- }
-
- @Override
- protected void joinSelector() {
- // wait until the selector thread exits
- try {
- selectAcceptThread_.join();
- } catch (InterruptedException e) {
- // for now, just silently ignore. technically this means we'll have less of
- // a graceful shutdown as a result.
- }
- }
-
- private interface CustomNonblockingFrameBuffer {
- TNonblockingTransport getTransport();
- }
-
- private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
- private TNonblockingTransport trans;
-
- public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
- super(trans, selectionKey, selectThread);
- this.trans = trans;
- }
-
- @Override
- public TNonblockingTransport getTransport() {
- return trans;
- }
- }
-
- private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
- private TNonblockingTransport trans;
-
- public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
- super(trans, selectionKey, selectThread);
- this.trans = trans;
- }
-
- @Override
- public TNonblockingTransport getTransport() {
- return trans;
- }
- }
-
- // @formatter:off
- private class SelectAcceptThread extends AbstractSelectThread {
-
- // The server transport on which new client transports will be accepted
- private final TNonblockingServerTransport serverTransport;
-
- /**
- * Set up the thread that will handle the non-blocking accepts, reads, and
- * writes.
- */
- public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
- throws IOException {
- this.serverTransport = serverTransport;
- serverTransport.registerSelector(selector);
- }
-
- public boolean isStopped() {
- return stopped_;
- }
-
- /**
- * The work loop. Handles both selecting (all IO operations) and managing
- * the selection preferences of all existing connections.
- */
- @Override
- public void run() {
- try {
- if (eventHandler_ != null) {
- eventHandler_.preServe();
- }
-
- while (!stopped_) {
- select();
- processInterestChanges();
- }
- for (SelectionKey selectionKey : selector.keys()) {
- cleanupSelectionKey(selectionKey);
- }
- } catch (Throwable t) {
- LOGGER.error("run() exiting due to uncaught error", t);
- } finally {
- stopped_ = true;
- }
- }
-
- /**
- * Select and process IO events appropriately:
- * If there are connections to be accepted, accept them.
- * If there are existing connections with data waiting to be read, read it,
- * buffering until a whole frame has been read.
- * If there are any pending responses, buffer them until their target client
- * is available, and then send the data.
- */
- private void select() {
- try {
- // wait for io events.
- selector.select();
-
- // process the io events we received
- Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
- while (!stopped_ && selectedKeys.hasNext()) {
- SelectionKey key = selectedKeys.next();
- selectedKeys.remove();
-
- // skip if not valid
- if (!key.isValid()) {
- cleanupSelectionKey(key);
- continue;
- }
-
- // if the key is marked Accept, then it has to be the server
- // transport.
- if (key.isAcceptable()) {
- handleAccept();
- } else if (key.isReadable()) {
- // deal with reads
- handleRead(key);
- } else if (key.isWritable()) {
- // deal with writes
- handleWrite(key);
- } else {
- LOGGER.warn("Unexpected state in select! " + key.interestOps());
- }
- }
- } catch (IOException e) {
- LOGGER.warn("Got an IOException while selecting!", e);
- }
- }
-
- /**
- * Accept a new connection.
- */
- @SuppressWarnings("unused")
- private void handleAccept() throws IOException {
- SelectionKey clientKey = null;
- TNonblockingTransport client = null;
- try {
- // accept the connection
- client = (TNonblockingTransport)serverTransport.accept();
- clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
-
- // add this key to the map
- FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
- new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
- new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
-
- clientKey.attach(frameBuffer);
- } catch (TTransportException tte) {
- // something went wrong accepting.
- LOGGER.warn("Exception trying to accept!", tte);
- tte.printStackTrace();
- if (clientKey != null) cleanupSelectionKey(clientKey);
- if (client != null) client.close();
- }
- }
- } // SelectAcceptThread
- // @formatter:on
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
deleted file mode 100644
index 2464a15..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
-import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
-import org.apache.accumulo.core.trace.wrappers.TraceWrap;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and restricts client-side notification of server-side errors to
- * {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
- * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
- * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
- *
- * @since 1.6.1
- */
-public class RpcWrapper {
-
- public static <T> T service(final T instance) {
- InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
- @Override
- public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
- try {
- return super.invoke(obj, method, args);
- } catch (RuntimeException e) {
- String msg = e.getMessage();
- LoggerFactory.getLogger(instance.getClass()).error(msg, e);
- throw new TException(msg);
- } catch (Error e) {
- String msg = e.getMessage();
- LoggerFactory.getLogger(instance.getClass()).error(msg, e);
- throw new TException(msg);
- }
- }
- };
-
- @SuppressWarnings("unchecked")
- T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler);
- return proxiedInstance;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
deleted file mode 100644
index 2962a52..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TBufferedServerSocket.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
-public class TBufferedServerSocket extends TServerTransport {
-
- // expose acceptImpl
- static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
- public TServerSocket(ServerSocket serverSocket) {
- super(serverSocket);
- }
-
- public TSocket acceptImplPublic() throws TTransportException {
- return acceptImpl();
- }
- }
-
- final TServerSocket impl;
- final int bufferSize;
-
- public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
- this.impl = new TServerSocket(serverSocket);
- this.bufferSize = bufferSize;
- }
-
- @Override
- public void listen() throws TTransportException {
- impl.listen();
- }
-
- @Override
- public void close() {
- impl.close();
- }
-
- // Wrap accepted sockets using buffered IO
- @Override
- protected TTransport acceptImpl() throws TTransportException {
- TSocket sock = impl.acceptImplPublic();
- try {
- return new TBufferedSocket(sock, this.bufferSize);
- } catch (IOException e) {
- throw new TTransportException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
deleted file mode 100644
index d1cdd8e..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.accumulo.server.util;
-
-import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-/**
- * Wrapper around ServerSocketChannel.
- *
- * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
- * The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
- */
-public class TNonblockingServerSocket extends TNonblockingServerTransport {
- private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
-
- /**
- * This channel is where all the nonblocking magic happens.
- */
- private ServerSocketChannel serverSocketChannel = null;
-
- /**
- * Underlying ServerSocket object
- */
- private ServerSocket serverSocket_ = null;
-
- /**
- * Timeout for client sockets from accept
- */
- private int clientTimeout_ = 0;
-
- /**
- * Creates just a port listening server socket
- */
- public TNonblockingServerSocket(int port) throws TTransportException {
- this(port, 0);
- }
-
- /**
- * Creates just a port listening server socket
- */
- public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
- this(new InetSocketAddress(port), clientTimeout);
- }
-
- public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
- this(bindAddr, 0);
- }
-
- public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
- clientTimeout_ = clientTimeout;
- try {
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
-
- // Make server socket
- serverSocket_ = serverSocketChannel.socket();
- // Prevent 2MSL delay problem on server restarts
- serverSocket_.setReuseAddress(true);
- // Bind to listening port
- serverSocket_.bind(bindAddr);
- } catch (IOException ioe) {
- serverSocket_ = null;
- throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
- }
- }
-
- public void listen() throws TTransportException {
- // Make sure not to block on accept
- if (serverSocket_ != null) {
- try {
- serverSocket_.setSoTimeout(0);
- } catch (SocketException sx) {
- log.error("SocketException caused by serverSocket in listen()", sx);
- }
- }
- }
-
- protected TNonblockingSocket acceptImpl() throws TTransportException {
- if (serverSocket_ == null) {
- throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
- }
- try {
- SocketChannel socketChannel = serverSocketChannel.accept();
- if (socketChannel == null) {
- return null;
- }
-
- TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
- tsocket.setTimeout(clientTimeout_);
- return tsocket;
- } catch (IOException iox) {
- throw new TTransportException(iox);
- }
- }
-
- public void registerSelector(Selector selector) {
- try {
- // Register the server socket channel, indicating an interest in
- // accepting new connections
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- } catch (ClosedChannelException e) {
- // this shouldn't happen, ideally...
- // TODO: decide what to do with this.
- }
- }
-
- public void close() {
- if (serverSocket_ != null) {
- try {
- serverSocket_.close();
- } catch (IOException iox) {
- log.warn("WARNING: Could not close server socket: " + iox.getMessage());
- }
- serverSocket_ = null;
- }
- }
-
- public void interrupt() {
- // The thread-safeness of this is dubious, but Java documentation suggests
- // that it is safe to do this from a different thread context
- close();
- }
-
- public int getPort() {
- return serverSocket_.getLocalPort();
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
deleted file mode 100644
index f1156d4..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.SslConnectionParams;
-import org.apache.accumulo.core.util.TBufferedSocket;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.metrics.Metrics;
-import org.apache.accumulo.server.metrics.MetricsFactory;
-import org.apache.accumulo.server.metrics.ThriftMetrics;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import com.google.common.net.HostAndPort;
-
-public class TServerUtils {
- private static final Logger log = Logger.getLogger(TServerUtils.class);
-
- public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
-
- public static class ServerAddress {
- public final TServer server;
- public final HostAndPort address;
-
- public ServerAddress(TServer server, HostAndPort address) {
- this.server = server;
- this.address = address;
- }
- }
-
- /**
- * Start a server, at the given port, or higher, if that port is not available.
- *
- * @param portHintProperty
- * the port to attempt to open, can be zero, meaning "any available port"
- * @param processor
- * the service to be started
- * @param serverName
- * the name of the class that is providing the service
- * @param threadName
- * name this service's thread for better debugging
- * @return the server object created, and the port actually used
- * @throws UnknownHostException
- * when we don't know our own address
- */
- public static ServerAddress startServer(AccumuloServerContext service, String address, Property portHintProperty, TProcessor processor, String serverName,
- String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
- throws UnknownHostException {
- int portHint = service.getConfiguration().getPort(portHintProperty);
- int minThreads = 2;
- if (minThreadProperty != null)
- minThreads = service.getConfiguration().getCount(minThreadProperty);
- long timeBetweenThreadChecks = 1000;
- if (timeBetweenThreadChecksProperty != null)
- timeBetweenThreadChecks = service.getConfiguration().getTimeInMillis(timeBetweenThreadChecksProperty);
- long maxMessageSize = 10 * 1000 * 1000;
- if (maxMessageSizeProperty != null)
- maxMessageSize = service.getConfiguration().getMemoryInBytes(maxMessageSizeProperty);
- boolean portSearch = false;
- if (portSearchProperty != null)
- portSearch = service.getConfiguration().getBoolean(portSearchProperty);
- // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
- TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(service.getConfiguration(), processor, serverName, threadName);
- Random random = new Random();
- for (int j = 0; j < 100; j++) {
-
- // Are we going to slide around, looking for an open port?
- int portsToSearch = 1;
- if (portSearch)
- portsToSearch = 1000;
-
- for (int i = 0; i < portsToSearch; i++) {
- int port = portHint + i;
- if (portHint != 0 && i > 0)
- port = 1024 + random.nextInt(65535 - 1024);
- if (port > 65535)
- port = 1024 + port % (65535 - 1024);
- try {
- HostAndPort addr = HostAndPort.fromParts(address, port);
- return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads,
- service.getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize,
- service.getServerSslParams(), service.getClientTimeoutInMillis());
- } catch (TTransportException ex) {
- log.error("Unable to start TServer", ex);
- if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
- // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
- // TTransportException, and with a TSocket created by TSSLTransportFactory, it
- // comes through as caused by a BindException.
- log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
- UtilWaitThread.sleep(250);
- } else {
- // thrift is passing up a nested exception that isn't a BindException,
- // so no reason to believe retrying on a different port would help.
- log.error("Unable to start TServer", ex);
- break;
- }
- }
- }
- }
- throw new UnknownHostException("Unable to find a listen port");
- }
-
- public static class TimedProcessor implements TProcessor {
-
- final TProcessor other;
- Metrics metrics = null;
- long idleStart = 0;
-
- TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) {
- this.other = next;
- // Register the metrics MBean
- MetricsFactory factory = new MetricsFactory(conf);
- metrics = factory.createThriftMetrics(serverName, threadName);
- try {
- metrics.register();
- } catch (Exception e) {
- log.error("Exception registering MBean with MBean Server", e);
- }
- idleStart = System.currentTimeMillis();
- }
-
- @Override
- public boolean process(TProtocol in, TProtocol out) throws TException {
- long now = 0;
- if (metrics.isEnabled()) {
- now = System.currentTimeMillis();
- metrics.add(ThriftMetrics.idle, (now - idleStart));
- }
- try {
- return other.process(in, out);
- } finally {
- if (metrics.isEnabled()) {
- idleStart = System.currentTimeMillis();
- metrics.add(ThriftMetrics.execute, idleStart - now);
- }
- }
- }
- }
-
- public static class ClientInfoProcessorFactory extends TProcessorFactory {
-
- public ClientInfoProcessorFactory(TProcessor processor) {
- super(processor);
- }
-
- @Override
- public TProcessor getProcessor(TTransport trans) {
- if (trans instanceof TBufferedSocket) {
- TBufferedSocket tsock = (TBufferedSocket) trans;
- clientAddress.set(tsock.getClientString());
- } else if (trans instanceof TSocket) {
- TSocket tsock = (TSocket) trans;
- clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
- } else {
- log.warn("Unable to extract clientAddress from transport of type " + trans.getClass());
- }
- return super.getProcessor(trans);
- }
- }
-
- public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
- final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
- TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
- CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
- options.protocolFactory(ThriftUtil.protocolFactory());
- options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
- options.maxReadBufferBytes = maxMessageSize;
- options.stopTimeoutVal(5);
- /*
- * Create our own very special thread pool.
- */
- final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
- // periodically adjust the number of threads we need by checking how busy our threads are
- SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
- @Override
- public void run() {
- if (pool.getCorePoolSize() <= pool.getActiveCount()) {
- int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
- log.info("Increasing server thread pool size on " + serverName + " to " + larger);
- pool.setMaximumPoolSize(larger);
- pool.setCorePoolSize(larger);
- } else {
- if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
- int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
- if (smaller != pool.getCorePoolSize()) {
- // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
- // we decrease the core pool size... so the active count could end up higher than
- // the core pool size, in which case everything will be queued... the increase case
- // should handle this and prevent deadlock
- log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
- pool.setCorePoolSize(smaller);
- }
- }
- }
- }
- }, timeBetweenThreadChecks, timeBetweenThreadChecks);
- options.executorService(pool);
- options.processorFactory(new TProcessorFactory(processor));
- if (address.getPort() == 0) {
- address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
- }
- return new ServerAddress(new CustomNonBlockingServer(options), address);
- }
-
- public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
- throws TTransportException {
-
- // if port is zero, then we must bind to get the port number
- ServerSocket sock;
- try {
- sock = ServerSocketChannel.open().socket();
- sock.setReuseAddress(true);
- sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
- address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
- } catch (IOException ex) {
- throw new TTransportException(ex);
- }
- TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
- return new ServerAddress(createThreadPoolServer(transport, processor), address);
- }
-
- public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
- TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
- options.protocolFactory(ThriftUtil.protocolFactory());
- options.transportFactory(ThriftUtil.transportFactory());
- options.processorFactory(new ClientInfoProcessorFactory(processor));
- return new TThreadPoolServer(options);
- }
-
- public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
- throws TTransportException {
- org.apache.thrift.transport.TServerSocket transport;
- try {
- transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
- } catch (UnknownHostException e) {
- throw new TTransportException(e);
- }
- if (address.getPort() == 0) {
- address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
- }
- return new ServerAddress(createThreadPoolServer(transport, processor), address);
- }
-
- public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
- long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
- return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
- timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
- }
-
- public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
- int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
-
- ServerAddress serverAddress;
- if (sslParams != null) {
- serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
- } else {
- serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
- }
- final TServer finalServer = serverAddress.server;
- Runnable serveTask = new Runnable() {
- @Override
- public void run() {
- try {
- finalServer.serve();
- } catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
- }
- }
- };
- serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
- Thread thread = new Daemon(serveTask, threadName);
- thread.start();
- // check for the special "bind to everything address"
- if (serverAddress.address.getHostText().equals("0.0.0.0")) {
- // can't get the address from the bind, so we'll do our best to invent our hostname
- try {
- serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
- } catch (UnknownHostException e) {
- throw new TTransportException(e);
- }
- }
- return serverAddress;
- }
-
- // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
- public static void stopTServer(TServer s) {
- if (s == null)
- return;
- s.stop();
- try {
- Field f = s.getClass().getDeclaredField("executorService_");
- f.setAccessible(true);
- ExecutorService es = (ExecutorService) f.get(s);
- es.shutdownNow();
- } catch (Exception e) {
- TServerUtils.log.error("Unable to call shutdownNow", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f25d8505/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index a822b92..337c055 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -17,9 +17,12 @@
package org.apache.accumulo.server.util;
import java.util.concurrent.ExecutorService;
+
+import org.apache.accumulo.server.thrift.TServerUtils;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TServerSocket;
import org.junit.Test;
+
import static org.junit.Assert.*;
import static org.easymock.EasyMock.*;
[5/5] accumulo git commit: ACCUMULO-3394 Fix up whitespace
Posted by el...@apache.org.
ACCUMULO-3394 Fix up whitespace
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b7927906
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b7927906
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b7927906
Branch: refs/heads/master
Commit: b7927906f980e5d49b02eff44d0edd64a038faf2
Parents: 79fa912
Author: Josh Elser <el...@apache.org>
Authored: Mon Dec 8 22:08:36 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Dec 8 22:08:36 2014 -0500
----------------------------------------------------------------------
.../server/thrift/CustomNonBlockingServer.java | 4 ++--
.../apache/accumulo/server/thrift/RpcWrapper.java | 2 +-
.../server/thrift/TBufferedServerSocket.java | 16 ++++++++--------
.../server/thrift/TNonblockingServerSocket.java | 2 +-
4 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7927906/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
index ceb0a42..a96f7b5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/CustomNonBlockingServer.java
@@ -34,7 +34,7 @@ import org.apache.thrift.transport.TTransportException;
* {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
* {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with
* one that reveals the client address from its transport.
- *
+ *
* <p>
* The justification for this is explained in https://issues.apache.org/jira/browse/ACCUMULO-1691, and is needed due to the repeated regressions:
* <ul>
@@ -42,7 +42,7 @@ import org.apache.thrift.transport.TTransportException;
* <li>https://issues.apache.org/jira/browse/THRIFT-1464</li>
* <li>https://issues.apache.org/jira/browse/THRIFT-2173</li>
* </ul>
- *
+ *
* <p>
* This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
* instantiating a custom FrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} and
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7927906/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
index db1863b..53ed709 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/RpcWrapper.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
* {@link TException} only, by wrapping {@link RuntimeException} and {@link Error} as {@link TException}, so it doesn't just close the connection and look like
* a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as
* {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
- *
+ *
* @since 1.6.1
*/
public class RpcWrapper {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7927906/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
index bf35bdf..5534313 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TBufferedServerSocket.java
@@ -27,36 +27,36 @@ import org.apache.thrift.transport.TTransportException;
// Thrift-959 removed the small buffer from TSocket; this adds it back for servers
public class TBufferedServerSocket extends TServerTransport {
-
+
// expose acceptImpl
static class TServerSocket extends org.apache.thrift.transport.TServerSocket {
public TServerSocket(ServerSocket serverSocket) {
super(serverSocket);
}
-
+
public TSocket acceptImplPublic() throws TTransportException {
return acceptImpl();
}
}
-
+
final TServerSocket impl;
final int bufferSize;
-
+
public TBufferedServerSocket(ServerSocket serverSocket, int bufferSize) {
this.impl = new TServerSocket(serverSocket);
this.bufferSize = bufferSize;
}
-
+
@Override
public void listen() throws TTransportException {
impl.listen();
}
-
+
@Override
public void close() {
impl.close();
}
-
+
// Wrap accepted sockets using buffered IO
@Override
protected TTransport acceptImpl() throws TTransportException {
@@ -67,5 +67,5 @@ public class TBufferedServerSocket extends TServerTransport {
throw new TTransportException(e);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b7927906/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
index f4b2df4..77c5ca6 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/TNonblockingServerSocket.java
@@ -36,7 +36,7 @@ import java.nio.channels.SocketChannel;
/**
* Wrapper around ServerSocketChannel.
- *
+ *
* This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version 0.9.
* The only change (apart from the logging statements) is the addition of the {@link #getPort()} method to retrieve the port used by the ServerSocket.
*/