You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/06/13 01:20:29 UTC
[2/3] git commit: Merge remote-tracking branch
'origin/1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Merge remote-tracking branch 'origin/1.5.2-SNAPSHOT' into 1.6.1-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f34230f7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f34230f7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f34230f7
Branch: refs/heads/master
Commit: f34230f7bf32b31d3043aaa761514f6743382a4a
Parents: 31aea2a c5aac49
Author: John Vines <vi...@apache.org>
Authored: Thu Jun 12 19:20:04 2014 -0400
Committer: John Vines <vi...@apache.org>
Committed: Thu Jun 12 19:20:04 2014 -0400
----------------------------------------------------------------------
.../java/org/apache/accumulo/server/util/TServerUtils.java | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f34230f7/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 6d9e4c7,0000000..36487d6
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@@ -1,374 -1,0 +1,369 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.SslConnectionParams;
+import org.apache.accumulo.core.util.TBufferedSocket;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.metrics.ThriftMetrics;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.net.HostAndPort;
+
+public class TServerUtils {
+ private static final Logger log = Logger.getLogger(TServerUtils.class);
+
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ public static class ServerAddress {
+ public final TServer server;
+ public final HostAndPort address;
+
+ public ServerAddress(TServer server, HostAndPort address) {
+ this.server = server;
+ this.address = address;
+ }
+ }
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ int portHint = conf.getPort(portHintProperty);
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = conf.getCount(minThreadProperty);
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty);
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty);
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = conf.getBoolean(portSearchProperty);
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(address, port);
+ return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize,
+ SslConnectionParams.forServer(conf), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ public static class TimedProcessor implements TProcessor {
+
+ final TProcessor other;
+ ThriftMetrics metrics = null;
+ long idleStart = 0;
+
+ TimedProcessor(TProcessor next, String serverName, String threadName) {
+ this.other = next;
+ // Register the metrics MBean
+ try {
+ metrics = new ThriftMetrics(serverName, threadName);
+ metrics.register();
+ } catch (Exception e) {
+ log.error("Exception registering MBean with MBean Server", e);
+ }
+ idleStart = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ long now = 0;
+ if (metrics.isEnabled()) {
+ now = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.idle, (now - idleStart));
+ }
+ try {
- try {
- return other.process(in, out);
- } catch (NullPointerException ex) {
- // THRIFT-1447 - remove with thrift 0.9
- return true;
- }
++ return other.process(in, out);
+ } finally {
+ if (metrics.isEnabled()) {
+ idleStart = System.currentTimeMillis();
+ metrics.add(ThriftMetrics.execute, idleStart - now);
+ }
+ }
+ }
+ }
+
+ public static class ClientInfoProcessorFactory extends TProcessorFactory {
+
+ public ClientInfoProcessorFactory(TProcessor processor) {
+ super(processor);
+ }
+
+ @Override
+ public TProcessor getProcessor(TTransport trans) {
+ if (trans instanceof TBufferedSocket) {
+ TBufferedSocket tsock = (TBufferedSocket) trans;
+ clientAddress.set(tsock.getClientString());
+ } else if (trans instanceof TSocket) {
+ TSocket tsock = (TSocket) trans;
+ clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
+ } else {
+ log.warn("Unable to extract clientAddress from transport of type " + trans.getClass());
+ }
+ return super.getProcessor(trans);
+ }
+ }
+
+ public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
+ public THsHaServer(Args args) {
+ super(args);
+ }
+
+ @Override
+ protected Runnable getRunnable(FrameBuffer frameBuffer) {
+ return new Invocation(frameBuffer);
+ }
+
+ private class Invocation implements Runnable {
+
+ private final FrameBuffer frameBuffer;
+
+ public Invocation(final FrameBuffer frameBuffer) {
+ this.frameBuffer = frameBuffer;
+ }
+
+ @Override
+ public void run() {
+ if (frameBuffer.trans_ instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
+ Socket sock = tsock.getSocketChannel().socket();
+ clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+ }
+ frameBuffer.invoke();
+ }
+ }
+ }
+
+ public static ServerAddress createHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
+ long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+ TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ THsHaServer.Args options = new THsHaServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+ /*
+ * Create our own very special thread pool.
+ */
+ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on " + serverName + " to " + larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ // there is a race condition here... the active count could be higher by the time
+ // we decrease the core pool size... so the active count could end up higher than
+ // the core pool size, in which case everything will be queued... the increase case
+ // should handle this and prevent deadlock
+ log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+ return new ServerAddress(new THsHaServer(options), address);
+ }
+
+ public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
+ throws TTransportException {
+
+ // if port is zero, then we must bind to get the port number
+ ServerSocket sock;
+ try {
+ sock = ServerSocketChannel.open().socket();
+ sock.setReuseAddress(true);
+ sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
+ address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
+ return new ServerAddress(createThreadPoolServer(transport, processor), address);
+ }
+
+ public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(ThriftUtil.protocolFactory());
+ options.transportFactory(ThriftUtil.transportFactory());
+ options.processorFactory(new ClientInfoProcessorFactory(processor));
+ return new TThreadPoolServer(options);
+ }
+
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+ throws TTransportException {
+ org.apache.thrift.transport.TServerSocket transport;
+ try {
+ transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+ return new ServerAddress(createThreadPoolServer(transport, processor), address);
+ }
+
+ public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads,
+ long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+ return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks,
+ maxMessageSize, sslParams, sslSocketTimeout);
+ }
+
+ public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
+ long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
+
+ ServerAddress serverAddress;
+ if (sslParams != null) {
+ serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
+ } else {
+ serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+ }
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ }
+ }
+ };
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ TServerUtils.log.error("Unable to call shutdownNow", e);
+ }
+ }
+}