You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/06/09 22:41:06 UTC
[01/10] accumulo git commit: ACCUMULO-4335 Error conditions that
result in a Halt should ensure non-zero exit code.
Repository: accumulo
Updated Branches:
refs/heads/1.6 60ac92d49 -> f99ac9b31
refs/heads/1.7 4dabc2bda -> 462dfbc3b
refs/heads/1.8 6a5a81799 -> cb25e77a2
refs/heads/master 87fce254e -> 64547abe8
ACCUMULO-4335 Error conditions that result in a Halt should ensure non-zero exit code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f99ac9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f99ac9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f99ac9b3
Branch: refs/heads/1.6
Commit: f99ac9b31cb2194d7b6fa2debc7e018a4410afe7
Parents: 60ac92d
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jun 8 10:56:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:39:00 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/util/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index d932b2a..a84cf15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -299,7 +299,7 @@ public class TServerUtils {
try {
finalServer.serve();
} catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
}
}
};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b98daf7..6d50e94 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -678,7 +678,7 @@ public class SimpleGarbageCollector implements Iface {
LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
+ Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 955c365..8155ea6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3212,7 +3212,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void lostLock(final LockLossReason reason) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(serverStopRequested ? 0 : 1, new Runnable() {
@Override
public void run() {
if (!serverStopRequested)
@@ -3224,7 +3224,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
[08/10] accumulo git commit: Merge branch '1.7' into 1.8
Posted by el...@apache.org.
Merge branch '1.7' into 1.8
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cb25e77a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cb25e77a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cb25e77a
Branch: refs/heads/1.8
Commit: cb25e77a2df29a7839a1d77357c4e6e6beafb82f
Parents: 6a5a817 462dfbc
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:45 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:45 2016 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[09/10] accumulo git commit: Merge branch '1.7' into 1.8
Posted by el...@apache.org.
Merge branch '1.7' into 1.8
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cb25e77a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cb25e77a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cb25e77a
Branch: refs/heads/master
Commit: cb25e77a2df29a7839a1d77357c4e6e6beafb82f
Parents: 6a5a817 462dfbc
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:45 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:45 2016 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[03/10] accumulo git commit: ACCUMULO-4335 Error conditions that
result in a Halt should ensure non-zero exit code.
Posted by el...@apache.org.
ACCUMULO-4335 Error conditions that result in a Halt should ensure non-zero exit code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f99ac9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f99ac9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f99ac9b3
Branch: refs/heads/1.8
Commit: f99ac9b31cb2194d7b6fa2debc7e018a4410afe7
Parents: 60ac92d
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jun 8 10:56:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:39:00 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/util/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index d932b2a..a84cf15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -299,7 +299,7 @@ public class TServerUtils {
try {
finalServer.serve();
} catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
}
}
};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b98daf7..6d50e94 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -678,7 +678,7 @@ public class SimpleGarbageCollector implements Iface {
LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
+ Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 955c365..8155ea6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3212,7 +3212,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void lostLock(final LockLossReason reason) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(serverStopRequested ? 0 : 1, new Runnable() {
@Override
public void run() {
if (!serverStopRequested)
@@ -3224,7 +3224,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
[07/10] accumulo git commit: Merge branch '1.6' into 1.7
Posted by el...@apache.org.
Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/462dfbc3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/462dfbc3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/462dfbc3
Branch: refs/heads/master
Commit: 462dfbc3b0e323552f33d185d63daa8a1833d6d7
Parents: 4dabc2b f99ac9b
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:34 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:34 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
.../java/org/apache/accumulo/tserver/log/TabletServerLogger.java | 2 +-
4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 77d6412,0000000..08ef944
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -1,577 -1,0 +1,577 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.net.ssl.SSLServerSocket;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Factory methods for creating Thrift server objects
+ */
+public class TServerUtils {
+ private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
+
+ /**
+ * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client address of any incoming RPC.
+ */
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param service
+ * RPC configuration
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @param portSearchProperty
+ * A boolean Property to control if port-search should be used, or null to disable
+ * @param minThreadProperty
+ * A Property to control the minimum number of threads in the pool
+ * @param timeBetweenThreadChecksProperty
+ * A Property to control the amount of time between checks to resize the thread pool
+ * @param maxMessageSizeProperty
+ * A Property to control the maximum Thrift message size accepted
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ final AccumuloConfiguration config = service.getConfiguration();
+
+ final int portHint = config.getPort(portHintProperty);
+
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = config.getCount(minThreadProperty);
+
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
+
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
+
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = config.getBoolean(portSearchProperty);
+
+ final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+ final ThriftServerType serverType = service.getThriftServerType();
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
+
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(hostname, port);
+ return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize,
+ timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis());
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ /**
+ * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+ */
+ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
+ String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+
+ final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+
+ // Create our own very special thread pool.
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ /**
+ * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the
+ * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof).
+ *
+ * @param serverName
+ * A name to describe the thrift server this executor will service
+ * @param executorThreads
+ * The maximum number of threads for the executor
+ * @param simpleTimerThreads
+ * The numbers of threads used to get the {@link SimpleTimer} instance
+ * @param timeBetweenThreadChecks
+ * The amount of time, in millis, between attempts to resize the executor thread pool
+ * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+ */
+ public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
+ @Override
+ public void run() {
+ // there is a minor race condition between sampling the current state of the thread pool and adjusting it
+ // however, this isn't really an issue, since it adjusts periodically anyway
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on {} to {}", serverName, larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ return pool;
+ }
+
+ /**
+ * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
+ *
+ * @param address
+ * Address to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param maxMessageSize
+ * Maximum size of a Thrift message allowed
+ * @return A configured TThreadPoolServer and its bound address information
+ */
+ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize,
+ String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+
+ TServerSocket transport = new TServerSocket(address.getPort());
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+ TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ return new ServerAddress(server, address);
+
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory.
+ *
+ * @param transport
+ * Server transport
+ * @param processor
+ * Processor implementation
+ * @param transportFactory
+ * Transport factory
+ * @return A configured {@link TThreadPoolServer}
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory) {
+ return createTThreadPoolServer(transport, processor, transportFactory, protocolFactory, null);
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory.
+ *
+ * @param transport
+ * TServerTransport for the server
+ * @param processor
+ * TProcessor for the server
+ * @param transportFactory
+ * TTransportFactory for the server
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory, ExecutorService service) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(transportFactory);
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ if (null != service) {
+ options.executorService(service);
+ }
+ return new TThreadPoolServer(options);
+ }
+
+ /**
+ * Create the Thrift server socket for RPC running over SSL.
+ *
+ * @param port
+ * Port of the server socket to bind to
+ * @param timeout
+ * Socket timeout
+ * @param address
+ * Address to bind the socket to
+ * @param params
+ * SSL parameters
+ * @return A configured TServerSocket configured to use SSL
+ */
+ public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+ TServerSocket tServerSock;
+ if (params.useJsse()) {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+ }
+
+ final ServerSocket serverSock = tServerSock.getServerSocket();
+ if (serverSock instanceof SSLServerSocket) {
+ SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+ String[] protocols = params.getServerProtocols();
+
+ // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
+ // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+ Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+ // Keep only the enabled protocols that were specified by the configuration
+ socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+ if (socketEnabledProtocols.isEmpty()) {
+ // Bad configuration...
+ throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
+ + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
+ }
+
+ // Set the protocol(s) on the server socket
+ sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+ }
+
+ return tServerSock;
+ }
+
+ /**
+ * Create a Thrift SSL server.
+ *
+ * @param address
+ * host and port to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param socketTimeout
+ * Socket timeout
+ * @param sslParams
+ * SSL parameters
+ * @return A ServerAddress with the bound-socket information and the Thrift server
+ */
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SslConnectionParams sslParams, String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+ TServerSocket transport;
+ try {
+ transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+
+ return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address);
+ }
+
+ public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SaslServerConnectionParams params, final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks)
+ throws TTransportException {
+ // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
+ // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
+ // when the server does an accept() to (presumably) wake up the eventing system.
+ log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHostText(), address.getPort());
+ TServerSocket transport = new TServerSocket(address.getPort(), (int) socketTimeout);
+
+ String hostname, fqdn;
+ try {
+ hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+ fqdn = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ // If we can't get a real hostname from the provided host test, use the hostname from DNS for localhost
+ if ("0.0.0.0".equals(hostname)) {
+ hostname = fqdn;
+ }
+
+ // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients and servers have to agree upon the FQDN
+ // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for this host, fail quickly and inform them to update
+ // their configuration.
+ if (!hostname.equals(fqdn)) {
+ log.error(
+ "Expected hostname of '{}' but got '{}'. Ensure the entries in the Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when using SASL.",
+ fqdn, hostname);
+ throw new RuntimeException("SASL requires that the address the thrift server listens on is the same as the FQDN for this host");
+ }
+
+ final UserGroupInformation serverUser;
+ try {
+ serverUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+
+ log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+
+ // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
+ // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
+ // *must* be the primary of the server.
+ TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslRpcServer.SaslGssCallbackHandler());
+
+ if (null != params.getSecretManager()) {
+ log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+ saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslServerDigestCallbackHandler(params.getSecretManager()));
+ } else {
+ log.info("SecretManager is null, not adding support for delegation token authentication");
+ }
+
+ // Make sure the TTransportFactory is performing a UGI.doAs
+ TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+ if (address.getPort() == 0) {
+ // If we chose a port dynamically, make a new use it (along with the proper hostname)
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ log.info("SASL thrift server bound on {}", address);
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
+
+ return new ServerAddress(server, address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int, int, long, long, SslConnectionParams,
+ * SaslServerConnectionParams, long)
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName,
+ int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+ return startTServer(address, serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
+ // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
+ checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
+
+ ServerAddress serverAddress;
+ switch (serverType) {
+ case SSL:
+ log.debug("Instantiating SSL Thrift server");
+ serverAddress = createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads,
+ timeBetweenThreadChecks);
+ break;
+ case SASL:
+ log.debug("Instantiating SASL Thrift server");
+ serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, saslParams, serverName, threadName, numThreads,
+ numSTThreads, timeBetweenThreadChecks);
+ break;
+ case THREADPOOL:
+ log.debug("Instantiating unsecure TThreadPool Thrift server");
+ serverAddress = createBlockingServer(address, processor, protocolFactory, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ break;
+ case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
+ default:
+ log.debug("Instantiating default, unsecure custom half-async Thrift server");
+ serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks,
+ maxMessageSize);
+ }
+
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
++ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
+ }
+ }
+ };
+
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ /**
+ * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to forcibly shut down the threadpool.
+ *
+ * @param s
+ * The TServer to stop
+ */
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ log.error("Unable to call shutdownNow", e);
+ }
+ }
+
+ /**
+ * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
+ * {@link ThriftServerType#SASL} and throws an exception when it is not.
+ *
+ * @return A {@link UGIAssumingProcessor} which wraps the provided processor
+ */
+ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
+ checkArgument(ThriftServerType.SASL == serverType);
+
+ // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
+ // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
+ // as the logged-in user.
+ log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
+
+ return new UGIAssumingProcessor(processor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a3b224f,8155ea6..a7abe05
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2370,10 -3224,10 +2370,10 @@@ public class TabletServer extends Accum
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
- log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
+ log.error("Lost ability to monitor tablet server lock, exiting.", e);
}
});
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c7b6c98,158fdbd..bb8ae6f
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -213,30 -192,16 +213,30 @@@ public class TabletServerLogger
this.createTime = System.currentTimeMillis();
return;
} catch (Exception t) {
- throw new RuntimeException(t);
- }
- }
+ if (null == retry) {
+ retry = retryFactory.create();
+ }
- public void resetLoggers() throws IOException {
- logSetLock.writeLock().lock();
- try {
- close();
- } finally {
- logSetLock.writeLock().unlock();
+ // We have more retries or we exceeded the maximum number of accepted failures
+ if (retry.canRetry()) {
+ // Use the retry and record the time in which we did so
+ retry.useRetry();
+
+ try {
+ // Backoff
+ retry.waitForNextAttempt();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ } else {
+ log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t);
+ // We didn't have retries or we failed too many times.
- Halt.halt("Experienced too many errors creating WALs, giving up");
++ Halt.halt("Experienced too many errors creating WALs, giving up", 1);
+ }
+
+ // The exception will trigger the log creation to be re-attempted.
+ throw new RuntimeException(t);
}
}
[06/10] accumulo git commit: Merge branch '1.6' into 1.7
Posted by el...@apache.org.
Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/462dfbc3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/462dfbc3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/462dfbc3
Branch: refs/heads/1.7
Commit: 462dfbc3b0e323552f33d185d63daa8a1833d6d7
Parents: 4dabc2b f99ac9b
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:34 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:34 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
.../java/org/apache/accumulo/tserver/log/TabletServerLogger.java | 2 +-
4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 77d6412,0000000..08ef944
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -1,577 -1,0 +1,577 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.net.ssl.SSLServerSocket;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Factory methods for creating Thrift server objects
+ */
+public class TServerUtils {
+ private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
+
+ /**
+ * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client address of any incoming RPC.
+ */
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param service
+ * RPC configuration
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @param portSearchProperty
+ * A boolean Property to control if port-search should be used, or null to disable
+ * @param minThreadProperty
+ * A Property to control the minimum number of threads in the pool
+ * @param timeBetweenThreadChecksProperty
+ * A Property to control the amount of time between checks to resize the thread pool
+ * @param maxMessageSizeProperty
+ * A Property to control the maximum Thrift message size accepted
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ final AccumuloConfiguration config = service.getConfiguration();
+
+ final int portHint = config.getPort(portHintProperty);
+
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = config.getCount(minThreadProperty);
+
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
+
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
+
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = config.getBoolean(portSearchProperty);
+
+ final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+ final ThriftServerType serverType = service.getThriftServerType();
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
+
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(hostname, port);
+ return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize,
+ timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis());
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ /**
+ * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+ */
+ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
+ String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+
+ final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+
+ // Create our own very special thread pool.
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ /**
+ * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the
+ * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof).
+ *
+ * @param serverName
+ * A name to describe the thrift server this executor will service
+ * @param executorThreads
+ * The maximum number of threads for the executor
+ * @param simpleTimerThreads
+ * The numbers of threads used to get the {@link SimpleTimer} instance
+ * @param timeBetweenThreadChecks
+ * The amount of time, in millis, between attempts to resize the executor thread pool
+ * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+ */
+ public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
+ @Override
+ public void run() {
+ // there is a minor race condition between sampling the current state of the thread pool and adjusting it
+ // however, this isn't really an issue, since it adjusts periodically anyway
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on {} to {}", serverName, larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ return pool;
+ }
+
+ /**
+ * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
+ *
+ * @param address
+ * Address to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param maxMessageSize
+ * Maximum size of a Thrift message allowed
+ * @return A configured TThreadPoolServer and its bound address information
+ */
+ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize,
+ String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+
+ TServerSocket transport = new TServerSocket(address.getPort());
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+ TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ return new ServerAddress(server, address);
+
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory.
+ *
+ * @param transport
+ * Server transport
+ * @param processor
+ * Processor implementation
+ * @param transportFactory
+ * Transport factory
+ * @return A configured {@link TThreadPoolServer}
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory) {
+ return createTThreadPoolServer(transport, processor, transportFactory, protocolFactory, null);
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory.
+ *
+ * @param transport
+ * TServerTransport for the server
+ * @param processor
+ * TProcessor for the server
+ * @param transportFactory
+ * TTransportFactory for the server
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory, ExecutorService service) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(transportFactory);
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ if (null != service) {
+ options.executorService(service);
+ }
+ return new TThreadPoolServer(options);
+ }
+
+ /**
+ * Create the Thrift server socket for RPC running over SSL.
+ *
+ * @param port
+ * Port of the server socket to bind to
+ * @param timeout
+ * Socket timeout
+ * @param address
+ * Address to bind the socket to
+ * @param params
+ * SSL parameters
+ * @return A configured TServerSocket configured to use SSL
+ */
+ public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+ TServerSocket tServerSock;
+ if (params.useJsse()) {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+ }
+
+ final ServerSocket serverSock = tServerSock.getServerSocket();
+ if (serverSock instanceof SSLServerSocket) {
+ SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+ String[] protocols = params.getServerProtocols();
+
+ // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
+ // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+ Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+ // Keep only the enabled protocols that were specified by the configuration
+ socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+ if (socketEnabledProtocols.isEmpty()) {
+ // Bad configuration...
+ throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
+ + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
+ }
+
+ // Set the protocol(s) on the server socket
+ sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+ }
+
+ return tServerSock;
+ }
+
+ /**
+ * Create a Thrift SSL server.
+ *
+ * @param address
+ * host and port to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param socketTimeout
+ * Socket timeout
+ * @param sslParams
+ * SSL parameters
+ * @return A ServerAddress with the bound-socket information and the Thrift server
+ */
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SslConnectionParams sslParams, String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+ TServerSocket transport;
+ try {
+ transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+
+ return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address);
+ }
+
+ public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SaslServerConnectionParams params, final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks)
+ throws TTransportException {
+ // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
+ // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
+ // when the server does an accept() to (presumably) wake up the eventing system.
+ log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHostText(), address.getPort());
+ TServerSocket transport = new TServerSocket(address.getPort(), (int) socketTimeout);
+
+ String hostname, fqdn;
+ try {
+ hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+ fqdn = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ // If we can't get a real hostname from the provided host test, use the hostname from DNS for localhost
+ if ("0.0.0.0".equals(hostname)) {
+ hostname = fqdn;
+ }
+
+ // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients and servers have to agree upon the FQDN
+ // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for this host, fail quickly and inform them to update
+ // their configuration.
+ if (!hostname.equals(fqdn)) {
+ log.error(
+ "Expected hostname of '{}' but got '{}'. Ensure the entries in the Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when using SASL.",
+ fqdn, hostname);
+ throw new RuntimeException("SASL requires that the address the thrift server listens on is the same as the FQDN for this host");
+ }
+
+ final UserGroupInformation serverUser;
+ try {
+ serverUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+
+ log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+
+ // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
+ // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
+ // *must* be the primary of the server.
+ TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslRpcServer.SaslGssCallbackHandler());
+
+ if (null != params.getSecretManager()) {
+ log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+ saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslServerDigestCallbackHandler(params.getSecretManager()));
+ } else {
+ log.info("SecretManager is null, not adding support for delegation token authentication");
+ }
+
+ // Make sure the TTransportFactory is performing a UGI.doAs
+ TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+ if (address.getPort() == 0) {
+ // If we chose a port dynamically, make a new use it (along with the proper hostname)
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ log.info("SASL thrift server bound on {}", address);
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
+
+ return new ServerAddress(server, address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int, int, long, long, SslConnectionParams,
+ * SaslServerConnectionParams, long)
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName,
+ int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+ return startTServer(address, serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
+ // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
+ checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
+
+ ServerAddress serverAddress;
+ switch (serverType) {
+ case SSL:
+ log.debug("Instantiating SSL Thrift server");
+ serverAddress = createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads,
+ timeBetweenThreadChecks);
+ break;
+ case SASL:
+ log.debug("Instantiating SASL Thrift server");
+ serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, saslParams, serverName, threadName, numThreads,
+ numSTThreads, timeBetweenThreadChecks);
+ break;
+ case THREADPOOL:
+ log.debug("Instantiating unsecure TThreadPool Thrift server");
+ serverAddress = createBlockingServer(address, processor, protocolFactory, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ break;
+ case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
+ default:
+ log.debug("Instantiating default, unsecure custom half-async Thrift server");
+ serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks,
+ maxMessageSize);
+ }
+
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
++ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
+ }
+ }
+ };
+
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ /**
+ * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to forcibly shut down the threadpool.
+ *
+ * @param s
+ * The TServer to stop
+ */
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ log.error("Unable to call shutdownNow", e);
+ }
+ }
+
+ /**
+ * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
+ * {@link ThriftServerType#SASL} and throws an exception when it is not.
+ *
+ * @return A {@link UGIAssumingProcessor} which wraps the provided processor
+ */
+ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
+ checkArgument(ThriftServerType.SASL == serverType);
+
+ // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
+ // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
+ // as the logged-in user.
+ log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
+
+ return new UGIAssumingProcessor(processor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a3b224f,8155ea6..a7abe05
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2370,10 -3224,10 +2370,10 @@@ public class TabletServer extends Accum
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
- log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
+ log.error("Lost ability to monitor tablet server lock, exiting.", e);
}
});
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c7b6c98,158fdbd..bb8ae6f
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -213,30 -192,16 +213,30 @@@ public class TabletServerLogger
this.createTime = System.currentTimeMillis();
return;
} catch (Exception t) {
- throw new RuntimeException(t);
- }
- }
+ if (null == retry) {
+ retry = retryFactory.create();
+ }
- public void resetLoggers() throws IOException {
- logSetLock.writeLock().lock();
- try {
- close();
- } finally {
- logSetLock.writeLock().unlock();
+ // We have more retries or we exceeded the maximum number of accepted failures
+ if (retry.canRetry()) {
+ // Use the retry and record the time in which we did so
+ retry.useRetry();
+
+ try {
+ // Backoff
+ retry.waitForNextAttempt();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ } else {
+ log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t);
+ // We didn't have retries or we failed too many times.
- Halt.halt("Experienced too many errors creating WALs, giving up");
++ Halt.halt("Experienced too many errors creating WALs, giving up", 1);
+ }
+
+ // The exception will trigger the log creation to be re-attempted.
+ throw new RuntimeException(t);
}
}
[10/10] accumulo git commit: Merge branch '1.8'
Posted by el...@apache.org.
Merge branch '1.8'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/64547abe
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/64547abe
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/64547abe
Branch: refs/heads/master
Commit: 64547abe85b9e4aadab6aad7ec6208ff9e52f63b
Parents: 87fce25 cb25e77
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:51 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:51 2016 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/10] accumulo git commit: ACCUMULO-4335 Error conditions that
result in a Halt should ensure non-zero exit code.
Posted by el...@apache.org.
ACCUMULO-4335 Error conditions that result in a Halt should ensure non-zero exit code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f99ac9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f99ac9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f99ac9b3
Branch: refs/heads/1.7
Commit: f99ac9b31cb2194d7b6fa2debc7e018a4410afe7
Parents: 60ac92d
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jun 8 10:56:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:39:00 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/util/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index d932b2a..a84cf15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -299,7 +299,7 @@ public class TServerUtils {
try {
finalServer.serve();
} catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
}
}
};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b98daf7..6d50e94 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -678,7 +678,7 @@ public class SimpleGarbageCollector implements Iface {
LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
+ Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 955c365..8155ea6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3212,7 +3212,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void lostLock(final LockLossReason reason) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(serverStopRequested ? 0 : 1, new Runnable() {
@Override
public void run() {
if (!serverStopRequested)
@@ -3224,7 +3224,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
[04/10] accumulo git commit: ACCUMULO-4335 Error conditions that
result in a Halt should ensure non-zero exit code.
Posted by el...@apache.org.
ACCUMULO-4335 Error conditions that result in a Halt should ensure non-zero exit code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f99ac9b3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f99ac9b3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f99ac9b3
Branch: refs/heads/master
Commit: f99ac9b31cb2194d7b6fa2debc7e018a4410afe7
Parents: 60ac92d
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jun 8 10:56:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:39:00 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/util/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index d932b2a..a84cf15 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -299,7 +299,7 @@ public class TServerUtils {
try {
finalServer.serve();
} catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
}
}
};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b98daf7..6d50e94 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -678,7 +678,7 @@ public class SimpleGarbageCollector implements Iface {
LockWatcher lockWatcher = new LockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
- Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
+ Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!", 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f99ac9b3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 955c365..8155ea6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3212,7 +3212,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void lostLock(final LockLossReason reason) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(serverStopRequested ? 0 : 1, new Runnable() {
@Override
public void run() {
if (!serverStopRequested)
@@ -3224,7 +3224,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
[05/10] accumulo git commit: Merge branch '1.6' into 1.7
Posted by el...@apache.org.
Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/462dfbc3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/462dfbc3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/462dfbc3
Branch: refs/heads/1.8
Commit: 462dfbc3b0e323552f33d185d63daa8a1833d6d7
Parents: 4dabc2b f99ac9b
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 9 18:40:34 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 9 18:40:34 2016 -0400
----------------------------------------------------------------------
.../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 2 +-
.../main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../src/main/java/org/apache/accumulo/tserver/TabletServer.java | 4 ++--
.../java/org/apache/accumulo/tserver/log/TabletServerLogger.java | 2 +-
4 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 77d6412,0000000..08ef944
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -1,577 -1,0 +1,577 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.net.ssl.SSLServerSocket;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.util.LoggingRunnable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Factory methods for creating Thrift server objects
+ */
+public class TServerUtils {
+ private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
+
+ /**
+ * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client address of any incoming RPC.
+ */
+ public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
+
+ /**
+ * Start a server, at the given port, or higher, if that port is not available.
+ *
+ * @param service
+ * RPC configuration
+ * @param portHintProperty
+ * the port to attempt to open, can be zero, meaning "any available port"
+ * @param processor
+ * the service to be started
+ * @param serverName
+ * the name of the class that is providing the service
+ * @param threadName
+ * name this service's thread for better debugging
+ * @param portSearchProperty
+ * A boolean Property to control if port-search should be used, or null to disable
+ * @param minThreadProperty
+ * A Property to control the minimum number of threads in the pool
+ * @param timeBetweenThreadChecksProperty
+ * A Property to control the amount of time between checks to resize the thread pool
+ * @param maxMessageSizeProperty
+ * A Property to control the maximum Thrift message size accepted
+ * @return the server object created, and the port actually used
+ * @throws UnknownHostException
+ * when we don't know our own address
+ */
+ public static ServerAddress startServer(AccumuloServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName,
+ String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
+ throws UnknownHostException {
+ final AccumuloConfiguration config = service.getConfiguration();
+
+ final int portHint = config.getPort(portHintProperty);
+
+ int minThreads = 2;
+ if (minThreadProperty != null)
+ minThreads = config.getCount(minThreadProperty);
+
+ long timeBetweenThreadChecks = 1000;
+ if (timeBetweenThreadChecksProperty != null)
+ timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
+
+ long maxMessageSize = 10 * 1000 * 1000;
+ if (maxMessageSizeProperty != null)
+ maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
+
+ boolean portSearch = false;
+ if (portSearchProperty != null)
+ portSearch = config.getBoolean(portSearchProperty);
+
+ final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+ final ThriftServerType serverType = service.getThriftServerType();
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
+ TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
+
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+
+ // Are we going to slide around, looking for an open port?
+ int portsToSearch = 1;
+ if (portSearch)
+ portsToSearch = 1000;
+
+ for (int i = 0; i < portsToSearch; i++) {
+ int port = portHint + i;
+ if (portHint != 0 && i > 0)
+ port = 1024 + random.nextInt(65535 - 1024);
+ if (port > 65535)
+ port = 1024 + port % (65535 - 1024);
+ try {
+ HostAndPort addr = HostAndPort.fromParts(hostname, port);
+ return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads, simpleTimerThreadpoolSize,
+ timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis());
+ } catch (TTransportException ex) {
+ log.error("Unable to start TServer", ex);
+ if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
+ // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
+ // TTransportException, and with a TSocket created by TSSLTransportFactory, it
+ // comes through as caused by a BindException.
+ log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
+ UtilWaitThread.sleep(250);
+ } else {
+ // thrift is passing up a nested exception that isn't a BindException,
+ // so no reason to believe retrying on a different port would help.
+ log.error("Unable to start TServer", ex);
+ break;
+ }
+ }
+ }
+ }
+ throw new UnknownHostException("Unable to find a listen port");
+ }
+
+ /**
+ * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+ */
+ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
+ String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+
+ final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
+ final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+
+ // Create our own very special thread pool.
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ /**
+ * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the
+ * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof).
+ *
+ * @param serverName
+ * A name to describe the thrift server this executor will service
+ * @param executorThreads
+ * The maximum number of threads for the executor
+ * @param simpleTimerThreads
+ * The numbers of threads used to get the {@link SimpleTimer} instance
+ * @param timeBetweenThreadChecks
+ * The amount of time, in millis, between attempts to resize the executor thread pool
+ * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+ */
+ public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
+ @Override
+ public void run() {
+ // there is a minor race condition between sampling the current state of the thread pool and adjusting it
+ // however, this isn't really an issue, since it adjusts periodically anyway
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ log.info("Increasing server thread pool size on {} to {}", serverName, larger);
+ pool.setMaximumPoolSize(larger);
+ pool.setCorePoolSize(larger);
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ if (smaller != pool.getCorePoolSize()) {
+ log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
+ pool.setCorePoolSize(smaller);
+ }
+ }
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks);
+ return pool;
+ }
+
+ /**
+ * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
+ *
+ * @param address
+ * Address to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param maxMessageSize
+ * Maximum size of a Thrift message allowed
+ * @return A configured TThreadPoolServer and its bound address information
+ */
+ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize,
+ String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+
+ TServerSocket transport = new TServerSocket(address.getPort());
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+ TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ return new ServerAddress(server, address);
+
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory.
+ *
+ * @param transport
+ * Server transport
+ * @param processor
+ * Processor implementation
+ * @param transportFactory
+ * Transport factory
+ * @return A configured {@link TThreadPoolServer}
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory) {
+ return createTThreadPoolServer(transport, processor, transportFactory, protocolFactory, null);
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory.
+ *
+ * @param transport
+ * TServerTransport for the server
+ * @param processor
+ * TProcessor for the server
+ * @param transportFactory
+ * TTransportFactory for the server
+ */
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory, ExecutorService service) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(transportFactory);
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ if (null != service) {
+ options.executorService(service);
+ }
+ return new TThreadPoolServer(options);
+ }
+
+ /**
+ * Create the Thrift server socket for RPC running over SSL.
+ *
+ * @param port
+ * Port of the server socket to bind to
+ * @param timeout
+ * Socket timeout
+ * @param address
+ * Address to bind the socket to
+ * @param params
+ * SSL parameters
+ * @return A configured TServerSocket configured to use SSL
+ */
+ public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+ TServerSocket tServerSock;
+ if (params.useJsse()) {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+ }
+
+ final ServerSocket serverSock = tServerSock.getServerSocket();
+ if (serverSock instanceof SSLServerSocket) {
+ SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+ String[] protocols = params.getServerProtocols();
+
+ // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
+ // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+ Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+ // Keep only the enabled protocols that were specified by the configuration
+ socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+ if (socketEnabledProtocols.isEmpty()) {
+ // Bad configuration...
+ throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
+ + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
+ }
+
+ // Set the protocol(s) on the server socket
+ sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+ }
+
+ return tServerSock;
+ }
+
+ /**
+ * Create a Thrift SSL server.
+ *
+ * @param address
+ * host and port to bind to
+ * @param processor
+ * TProcessor for the server
+ * @param socketTimeout
+ * Socket timeout
+ * @param sslParams
+ * SSL parameters
+ * @return A ServerAddress with the bound-socket information and the Thrift server
+ */
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SslConnectionParams sslParams, String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+ TServerSocket transport;
+ try {
+ transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+
+ return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address);
+ }
+
+ public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
+ SaslServerConnectionParams params, final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks)
+ throws TTransportException {
+ // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
+ // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
+ // when the server does an accept() to (presumably) wake up the eventing system.
+ log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHostText(), address.getPort());
+ TServerSocket transport = new TServerSocket(address.getPort(), (int) socketTimeout);
+
+ String hostname, fqdn;
+ try {
+ hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
+ fqdn = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ // If we can't get a real hostname from the provided host test, use the hostname from DNS for localhost
+ if ("0.0.0.0".equals(hostname)) {
+ hostname = fqdn;
+ }
+
+ // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients and servers have to agree upon the FQDN
+ // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for this host, fail quickly and inform them to update
+ // their configuration.
+ if (!hostname.equals(fqdn)) {
+ log.error(
+ "Expected hostname of '{}' but got '{}'. Ensure the entries in the Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when using SASL.",
+ fqdn, hostname);
+ throw new RuntimeException("SASL requires that the address the thrift server listens on is the same as the FQDN for this host");
+ }
+
+ final UserGroupInformation serverUser;
+ try {
+ serverUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+
+ log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+
+ // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
+ // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
+ // *must* be the primary of the server.
+ TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslRpcServer.SaslGssCallbackHandler());
+
+ if (null != params.getSecretManager()) {
+ log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+ saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslServerDigestCallbackHandler(params.getSecretManager()));
+ } else {
+ log.info("SecretManager is null, not adding support for delegation token authentication");
+ }
+
+ // Make sure the TTransportFactory is performing a UGI.doAs
+ TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+ if (address.getPort() == 0) {
+ // If we chose a port dynamically, make a new use it (along with the proper hostname)
+ address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
+ log.info("SASL thrift server bound on {}", address);
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
+
+ return new ServerAddress(server, address);
+ }
+
+ public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ if (ThriftServerType.SASL == serverType) {
+ processor = updateSaslProcessor(serverType, processor);
+ }
+
+ return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int, int, long, long, SslConnectionParams,
+ * SaslServerConnectionParams, long)
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName,
+ int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+ return startTServer(address, serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
+ */
+ public static ServerAddress startTServer(HostAndPort address, ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory,
+ String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
+
+ // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
+ // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
+ checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
+
+ ServerAddress serverAddress;
+ switch (serverType) {
+ case SSL:
+ log.debug("Instantiating SSL Thrift server");
+ serverAddress = createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads,
+ timeBetweenThreadChecks);
+ break;
+ case SASL:
+ log.debug("Instantiating SASL Thrift server");
+ serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, saslParams, serverName, threadName, numThreads,
+ numSTThreads, timeBetweenThreadChecks);
+ break;
+ case THREADPOOL:
+ log.debug("Instantiating unsecure TThreadPool Thrift server");
+ serverAddress = createBlockingServer(address, processor, protocolFactory, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ break;
+ case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
+ default:
+ log.debug("Instantiating default, unsecure custom half-async Thrift server");
+ serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks,
+ maxMessageSize);
+ }
+
+ final TServer finalServer = serverAddress.server;
+ Runnable serveTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
- Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
++ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
+ }
+ }
+ };
+
+ serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
+ Thread thread = new Daemon(serveTask, threadName);
+ thread.start();
+
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHostText().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+
+ /**
+ * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to forcibly shut down the threadpool.
+ *
+ * @param s
+ * The TServer to stop
+ */
+ public static void stopTServer(TServer s) {
+ if (s == null)
+ return;
+ s.stop();
+ try {
+ Field f = s.getClass().getDeclaredField("executorService_");
+ f.setAccessible(true);
+ ExecutorService es = (ExecutorService) f.get(s);
+ es.shutdownNow();
+ } catch (Exception e) {
+ log.error("Unable to call shutdownNow", e);
+ }
+ }
+
+ /**
+ * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
+ * {@link ThriftServerType#SASL} and throws an exception when it is not.
+ *
+ * @return A {@link UGIAssumingProcessor} which wraps the provided processor
+ */
+ private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
+ checkArgument(ThriftServerType.SASL == serverType);
+
+ // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
+ // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
+ // as the logged-in user.
+ log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
+
+ return new UGIAssumingProcessor(processor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a3b224f,8155ea6..a7abe05
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2370,10 -3224,10 +2370,10 @@@ public class TabletServer extends Accum
@Override
public void unableToMonitorLockNode(final Throwable e) {
- Halt.halt(0, new Runnable() {
+ Halt.halt(1, new Runnable() {
@Override
public void run() {
- log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
+ log.error("Lost ability to monitor tablet server lock, exiting.", e);
}
});
http://git-wip-us.apache.org/repos/asf/accumulo/blob/462dfbc3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c7b6c98,158fdbd..bb8ae6f
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -213,30 -192,16 +213,30 @@@ public class TabletServerLogger
this.createTime = System.currentTimeMillis();
return;
} catch (Exception t) {
- throw new RuntimeException(t);
- }
- }
+ if (null == retry) {
+ retry = retryFactory.create();
+ }
- public void resetLoggers() throws IOException {
- logSetLock.writeLock().lock();
- try {
- close();
- } finally {
- logSetLock.writeLock().unlock();
+ // We have more retries or we exceeded the maximum number of accepted failures
+ if (retry.canRetry()) {
+ // Use the retry and record the time in which we did so
+ retry.useRetry();
+
+ try {
+ // Backoff
+ retry.waitForNextAttempt();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ } else {
+ log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t);
+ // We didn't have retries or we failed too many times.
- Halt.halt("Experienced too many errors creating WALs, giving up");
++ Halt.halt("Experienced too many errors creating WALs, giving up", 1);
+ }
+
+ // The exception will trigger the log creation to be re-attempted.
+ throw new RuntimeException(t);
}
}