You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/04/22 04:46:58 UTC
[accumulo] branch master updated: Create AbstractServer type for
Accumulo servers (#1103)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 02c2cb8 Create AbstractServer type for Accumulo servers (#1103)
02c2cb8 is described below
commit 02c2cb8bcbb3fdd88ffd1c172738909b18032eec
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Mon Apr 22 00:46:52 2019 -0400
Create AbstractServer type for Accumulo servers (#1103)
Create base class to help standardize the way Accumulo servers are
initialized from configuration files, launch, run, and clean up
resources during shutdown.
This establishes the base class as a concept, but over time, more
implementation that servers have in common can be put here, to make
server-side code more maintainable, and so the code in the individual
servers can be limited to the bits that make them different.
---
.../org/apache/accumulo/server/AbstractServer.java | 97 ++++++
.../org/apache/accumulo/server/ServerContext.java | 41 +--
.../java/org/apache/accumulo/gc/GCExecutable.java | 2 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 123 +++----
.../gc/SimpleGarbageCollectorOptsTest.java | 6 +-
.../accumulo/gc/SimpleGarbageCollectorTest.java | 20 +-
.../java/org/apache/accumulo/master/Master.java | 214 +++++++------
.../apache/accumulo/monitor/EmbeddedWebServer.java | 2 +-
.../java/org/apache/accumulo/monitor/Monitor.java | 41 +--
.../apache/accumulo/monitor/MonitorExecutable.java | 2 +-
.../org/apache/accumulo/tserver/TabletServer.java | 352 ++++++++++++---------
11 files changed, 496 insertions(+), 404 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
new file mode 100644
index 0000000..6c894f6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.server.metrics.MetricsSystemHelper;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractServer implements AutoCloseable, Runnable {
+
+ private final ServerContext context;
+ private final String applicationName;
+ private final String hostname;
+ private final Logger log;
+
+ protected AbstractServer(String appName, ServerOpts opts, String[] args) {
+ this.log = LoggerFactory.getLogger(getClass().getName());
+ this.applicationName = appName;
+ this.hostname = Objects.requireNonNull(opts.getAddress());
+ opts.parseArgs(appName, args);
+ SiteConfiguration siteConfig = opts.getSiteConfiguration();
+ context = new ServerContext(siteConfig);
+ SecurityUtil.serverLogin(siteConfig);
+ log.info("Version " + Constants.VERSION);
+ log.info("Instance " + context.getInstanceID());
+ ServerUtil.init(context, appName);
+ MetricsSystemHelper.configure(getClass().getName());
+ TraceUtil.enableServerTraces(hostname, appName, context.getConfiguration());
+ if (context.getSaslParams() != null) {
+ // Server-side "client" check to make sure we're logged in as a user we expect to be
+ context.enforceKerberosLogin();
+ }
+ }
+
+ /**
+ * Run this server in a main thread
+ */
+ public void runServer() throws Exception {
+ final AtomicReference<Throwable> err = new AtomicReference<>();
+ Thread service = new Thread(this, applicationName);
+ service.setUncaughtExceptionHandler((thread, exception) -> {
+ err.set(exception);
+ });
+ service.start();
+ service.join();
+ Throwable thrown = err.get();
+ if (thrown != null) {
+ if (thrown instanceof Error) {
+ throw (Error) thrown;
+ }
+ if (thrown instanceof Exception) {
+ throw (Exception) thrown;
+ }
+ throw new RuntimeException("Weird throwable type thrown", thrown);
+ }
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public ServerContext getContext() {
+ return context;
+ }
+
+ public AccumuloConfiguration getConfiguration() {
+ return getContext().getConfiguration();
+ }
+
+ @Override
+ public void close() {
+ TraceUtil.disable();
+ }
+
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 3a61fe6..ac9415a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -19,10 +19,8 @@ package org.apache.accumulo.server;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.util.Objects;
import java.util.Properties;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -32,11 +30,9 @@ import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.crypto.CryptoServiceFactory.ClassloaderType;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.spi.crypto.CryptoService;
-import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.metrics.MetricsSystemHelper;
import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.SecurityUtil;
@@ -44,8 +40,6 @@ import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretM
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Provides a server context for Accumulo server components that operate with the system credentials
@@ -53,16 +47,11 @@ import org.slf4j.LoggerFactory;
*/
public class ServerContext extends ClientContext {
- private static final Logger log = LoggerFactory.getLogger(ServerContext.class);
-
private final ServerInfo info;
private TableManager tableManager;
private UniqueNameAllocator nameAllocator;
private ZooReaderWriter zooReaderWriter;
private ServerConfigurationFactory serverConfFactory = null;
- private String applicationName = null;
- private String applicationClassName = null;
- private String hostname = null;
private AuthenticationTokenSecretManager secretManager;
private CryptoService cryptoService = null;
@@ -90,44 +79,19 @@ public class ServerContext extends ClientContext {
zooReaderWriter = new ZooReaderWriter(info.getSiteConfiguration());
}
- public void setupServer(String appName, String appClassName, String hostname) {
- applicationName = appName;
- applicationClassName = appClassName;
- this.hostname = hostname;
- SecurityUtil.serverLogin(info.getSiteConfiguration());
- log.info("Version " + Constants.VERSION);
- log.info("Instance " + info.getInstanceID());
- ServerUtil.init(this, applicationName);
- MetricsSystemHelper.configure(applicationClassName);
- TraceUtil.enableServerTraces(hostname, applicationName,
- getServerConfFactory().getSystemConfiguration());
- if (getSaslParams() != null) {
- // Server-side "client" check to make sure we're logged in as a user we expect to be
- enforceKerberosLogin();
- }
- }
-
/**
* Should only be called by the Tablet server
*/
public synchronized void setupCrypto() throws CryptoService.CryptoException {
- if (cryptoService != null)
+ if (cryptoService != null) {
throw new CryptoService.CryptoException("Crypto Service " + cryptoService.getClass().getName()
+ " already exists and cannot be setup again");
+ }
AccumuloConfiguration acuConf = getConfiguration();
cryptoService = CryptoServiceFactory.newInstance(acuConf, ClassloaderType.ACCUMULO);
}
- public void teardownServer() {
- TraceUtil.disable();
- }
-
- public String getHostname() {
- Objects.requireNonNull(hostname);
- return hostname;
- }
-
public synchronized ServerConfigurationFactory getServerConfFactory() {
if (serverConfFactory == null) {
serverConfFactory = new ServerConfigurationFactory(this, info.getSiteConfiguration());
@@ -244,4 +208,5 @@ public class ServerContext extends ClientContext {
}
return cryptoService;
}
+
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCExecutable.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCExecutable.java
index 8bc45bd..2690c29 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCExecutable.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCExecutable.java
@@ -38,7 +38,7 @@ public class GCExecutable implements KeywordExecutable {
}
@Override
- public void execute(final String[] args) {
+ public void execute(final String[] args) throws Exception {
SimpleGarbageCollector.main(args);
}
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index a88eae0..755395d 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -40,7 +40,6 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.Tables;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -74,8 +73,8 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences;
+import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
@@ -106,13 +105,13 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
// Could/Should implement HighlyAvaialbleService but the Thrift server is already started before
// the ZK lock is acquired. The server is only for metrics, there are no concerns about clients
// using the service before the lock is acquired.
-public class SimpleGarbageCollector implements Iface {
+public class SimpleGarbageCollector extends AbstractServer implements Iface {
private static final Text EMPTY_TEXT = new Text();
/**
* Options for the garbage collector.
*/
- static class Opts extends ServerOpts {
+ static class GCOpts extends ServerOpts {
@Parameter(names = {"-v", "--verbose"},
description = "extra information will get printed to stdout also")
boolean verbose = false;
@@ -128,32 +127,21 @@ public class SimpleGarbageCollector implements Iface {
private static final Logger log = LoggerFactory.getLogger(SimpleGarbageCollector.class);
- private ServerContext context;
- private VolumeManager fs;
- private Opts opts;
+ private GCOpts opts;
private ZooLock lock;
private GCStatus status =
new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
- public static void main(String[] args) {
- final String app = "gc";
- Opts opts = new Opts();
- opts.parseArgs(app, args);
- ServerContext context = new ServerContext(opts.getSiteConfiguration());
- context.setupServer(app, SimpleGarbageCollector.class.getName(), opts.getAddress());
- try {
- SimpleGarbageCollector gc = new SimpleGarbageCollector(opts, context);
- gc.run();
- } finally {
- context.teardownServer();
+ public static void main(String[] args) throws Exception {
+ try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new GCOpts(), args)) {
+ gc.runServer();
}
}
- public SimpleGarbageCollector(Opts opts, ServerContext context) {
- this.context = context;
+ SimpleGarbageCollector(GCOpts opts, String[] args) {
+ super("gc", opts, args);
this.opts = opts;
- this.fs = context.getVolumeManager();
long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
log.info("start delay: {} milliseconds", getStartDelay());
@@ -165,18 +153,6 @@ public class SimpleGarbageCollector implements Iface {
log.info("delete threads: {}", getNumDeleteThreads());
}
- ServerContext getContext() {
- return context;
- }
-
- AccumuloConfiguration getConfiguration() {
- return context.getConfiguration();
- }
-
- AccumuloClient getClient() {
- return context;
- }
-
/**
* Gets the delay before the first collection.
*
@@ -187,15 +163,6 @@ public class SimpleGarbageCollector implements Iface {
}
/**
- * Gets the volume manager used by this GC.
- *
- * @return volume manager
- */
- VolumeManager getVolumeManager() {
- return fs;
- }
-
- /**
* Checks if the volume manager should move files to the trash rather than delete them.
*
* @return true if trash is used
@@ -205,13 +172,6 @@ public class SimpleGarbageCollector implements Iface {
}
/**
- * Gets the options for this garbage collector.
- */
- Opts getOpts() {
- return opts;
- }
-
- /**
* Gets the number of threads used for deleting files.
*
* @return number of delete threads
@@ -241,7 +201,7 @@ public class SimpleGarbageCollector implements Iface {
range.getEndKey(), range.isEndKeyInclusive());
}
- Scanner scanner = getClient().createScanner(tableName, Authorizations.EMPTY);
+ Scanner scanner = getContext().createScanner(tableName, Authorizations.EMPTY);
scanner.setRange(range);
result.clear();
// find candidates for deletion; chop off the prefix
@@ -263,7 +223,7 @@ public class SimpleGarbageCollector implements Iface {
public Iterator<String> getBlipIterator() throws TableNotFoundException {
@SuppressWarnings("resource")
IsolatedScanner scanner =
- new IsolatedScanner(getClient().createScanner(tableName, Authorizations.EMPTY));
+ new IsolatedScanner(getContext().createScanner(tableName, Authorizations.EMPTY));
scanner.setRange(MetadataSchema.BlipSection.getRange());
@@ -275,7 +235,7 @@ public class SimpleGarbageCollector implements Iface {
public Stream<Reference> getReferences() {
Stream<TabletMetadata> tabletStream = TabletsMetadata.builder().scanTable(tableName)
- .checkConsistency().fetchDir().fetchFiles().fetchScans().build(getClient()).stream();
+ .checkConsistency().fetchDir().fetchFiles().fetchScans().build(getContext()).stream();
Stream<Reference> refStream = tabletStream.flatMap(tm -> {
Stream<Reference> refs = Stream.concat(tm.getFiles().stream(), tm.getScans().stream())
@@ -291,25 +251,28 @@ public class SimpleGarbageCollector implements Iface {
@Override
public Set<TableId> getTableIDs() {
- return Tables.getIdToNameMap(context).keySet();
+ return Tables.getIdToNameMap(getContext()).keySet();
}
@Override
public void delete(SortedMap<String,String> confirmedDeletes) throws TableNotFoundException {
+ final VolumeManager fs = getContext().getVolumeManager();
if (opts.safeMode) {
- if (opts.verbose)
+ if (opts.verbose) {
System.out.println("SAFEMODE: There are " + confirmedDeletes.size()
+ " data file candidates marked for deletion.%n"
+ " Examine the log files to identify them.%n");
+ }
log.info("SAFEMODE: Listing all data file candidates for deletion");
- for (String s : confirmedDeletes.values())
+ for (String s : confirmedDeletes.values()) {
log.info("SAFEMODE: {}", s);
+ }
log.info("SAFEMODE: End candidates for deletion");
return;
}
- AccumuloClient c = getClient();
+ AccumuloClient c = getContext();
BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig());
// when deleting a dir and all files in that dir, only need to delete the dir
@@ -396,12 +359,13 @@ public class SimpleGarbageCollector implements Iface {
if (parts.length > 2) {
TableId tableId = TableId.of(parts[1]);
String tabletDir = parts[2];
- context.getTableManager().updateTableStateCache(tableId);
- TableState tableState = context.getTableManager().getTableState(tableId);
+ getContext().getTableManager().updateTableStateCache(tableId);
+ TableState tableState = getContext().getTableManager().getTableState(tableId);
if (tableState != null && tableState != TableState.DELETING) {
// clone directories don't always exist
- if (!tabletDir.startsWith(Constants.CLONE_PREFIX))
+ if (!tabletDir.startsWith(Constants.CLONE_PREFIX)) {
log.debug("File doesn't exist: {}", fullPath);
+ }
}
} else {
log.warn("Very strange path name: {}", delete);
@@ -441,9 +405,10 @@ public class SimpleGarbageCollector implements Iface {
@Override
public void deleteTableDirIfEmpty(TableId tableID) throws IOException {
+ final VolumeManager fs = getContext().getVolumeManager();
// if dir exist and is empty, then empty list is returned...
// hadoop 2.0 will throw an exception if the file does not exist
- for (String dir : ServerConstants.getTablesDirs(context)) {
+ for (String dir : ServerConstants.getTablesDirs(getContext())) {
FileStatus[] tabletDirs = null;
try {
tabletDirs = fs.listStatus(new Path(dir + "/" + tableID));
@@ -454,8 +419,9 @@ public class SimpleGarbageCollector implements Iface {
if (tabletDirs.length == 0) {
Path p = new Path(dir + "/" + tableID);
log.debug("Removing table dir {}", p);
- if (!moveToTrash(p))
+ if (!moveToTrash(p)) {
fs.delete(p);
+ }
}
}
}
@@ -472,7 +438,7 @@ public class SimpleGarbageCollector implements Iface {
@Override
public Iterator<Entry<String,Status>> getReplicationNeededIterator() {
- AccumuloClient client = getClient();
+ AccumuloClient client = getContext();
try {
Scanner s = ReplicationTable.getScanner(client);
StatusSection.limit(s);
@@ -494,8 +460,10 @@ public class SimpleGarbageCollector implements Iface {
}
}
+ @Override
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
- private void run() {
+ public void run() {
+ final VolumeManager fs = getContext().getVolumeManager();
long tStart, tStop;
// Sleep for an initial period, giving the master time to start up and
@@ -555,7 +523,7 @@ public class SimpleGarbageCollector implements Iface {
* GarbageCollectWriteAheadLogs to ensure we delete as many files as possible.
*/
try (TraceScope replSpan = Trace.startSpan("replicationClose")) {
- CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(context);
+ CloseWriteAheadLogReferences closeWals = new CloseWriteAheadLogReferences(getContext());
closeWals.run();
} catch (Exception e) {
log.error("Error trying to close write-ahead logs for replication table", e);
@@ -564,7 +532,7 @@ public class SimpleGarbageCollector implements Iface {
// Clean up any unused write-ahead logs
try (TraceScope waLogs = Trace.startSpan("walogs")) {
GarbageCollectWriteAheadLogs walogCollector =
- new GarbageCollectWriteAheadLogs(context, fs, isUsingTrash());
+ new GarbageCollectWriteAheadLogs(getContext(), fs, isUsingTrash());
log.info("Beginning garbage collection of write-ahead logs");
walogCollector.collect(status);
} catch (Exception e) {
@@ -574,7 +542,7 @@ public class SimpleGarbageCollector implements Iface {
// we just made a lot of metadata changes: flush them out
try {
- AccumuloClient accumuloClient = getClient();
+ AccumuloClient accumuloClient = getContext();
accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true);
accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true);
} catch (Exception e) {
@@ -602,8 +570,10 @@ public class SimpleGarbageCollector implements Iface {
* if the volume manager encountered a problem
*/
boolean moveToTrash(Path path) throws IOException {
- if (!isUsingTrash())
+ final VolumeManager fs = getContext().getVolumeManager();
+ if (!isUsingTrash()) {
return false;
+ }
try {
return fs.moveToTrash(path);
} catch (FileNotFoundException ex) {
@@ -612,7 +582,7 @@ public class SimpleGarbageCollector implements Iface {
}
private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedException {
- String path = context.getZooKeeperRoot() + Constants.ZGC_LOCK;
+ String path = getContext().getZooKeeperRoot() + Constants.ZGC_LOCK;
LockWatcher lockWatcher = new LockWatcher() {
@Override
@@ -629,7 +599,7 @@ public class SimpleGarbageCollector implements Iface {
};
while (true) {
- lock = new ZooLock(context.getZooReaderWriter(), path);
+ lock = new ZooLock(getContext().getZooReaderWriter(), path);
if (lock.tryLock(lockWatcher,
new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
log.debug("Got GC ZooKeeper lock");
@@ -643,7 +613,7 @@ public class SimpleGarbageCollector implements Iface {
private HostAndPort startStatsService() {
Iface rpcProxy = TraceUtil.wrapService(this);
final Processor<Iface> processor;
- if (context.getThriftServerType() == ThriftServerType.SASL) {
+ if (getContext().getThriftServerType() == ThriftServerType.SASL) {
Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
processor = new Processor<>(tcProxy);
} else {
@@ -654,10 +624,11 @@ public class SimpleGarbageCollector implements Iface {
long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
try {
ServerAddress server =
- TServerUtils.startTServer(getConfiguration(), context.getThriftServerType(), processor,
- this.getClass().getSimpleName(), "GC Monitor Service", 2,
+ TServerUtils.startTServer(getConfiguration(), getContext().getThriftServerType(),
+ processor, this.getClass().getSimpleName(), "GC Monitor Service", 2,
getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000,
- maxMessageSize, context.getServerSslParams(), context.getSaslParams(), 0, addresses);
+ maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0,
+ addresses);
log.debug("Starting garbage collector listening on " + server.address);
return server.address;
} catch (Exception ex) {
@@ -699,9 +670,11 @@ public class SimpleGarbageCollector implements Iface {
return false;
}
int slashCount = 0;
- for (int i = 0; i < delete.length(); i++)
- if (delete.charAt(i) == '/')
+ for (int i = 0; i < delete.length(); i++) {
+ if (delete.charAt(i) == '/') {
slashCount++;
+ }
+ }
return slashCount == 1;
}
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorOptsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorOptsTest.java
index b91784d..cec83ca 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorOptsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorOptsTest.java
@@ -18,16 +18,16 @@ package org.apache.accumulo.gc;
import static org.junit.Assert.assertFalse;
-import org.apache.accumulo.gc.SimpleGarbageCollector.Opts;
+import org.apache.accumulo.gc.SimpleGarbageCollector.GCOpts;
import org.junit.Before;
import org.junit.Test;
public class SimpleGarbageCollectorOptsTest {
- private Opts opts;
+ private GCOpts opts;
@Before
public void setUp() {
- opts = new Opts();
+ opts = new GCOpts();
}
@Test
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index 376bb59..8bf950e 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -20,11 +20,11 @@ import static org.apache.accumulo.gc.SimpleGarbageCollector.CANDIDATE_MEMORY_PER
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.partialMockBuilder;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -36,9 +36,6 @@ import org.apache.accumulo.core.clientImpl.Credentials;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.trace.thrift.TInfo;
-import org.apache.accumulo.gc.SimpleGarbageCollector.Opts;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
@@ -50,7 +47,6 @@ public class SimpleGarbageCollectorTest {
private VolumeManager volMgr;
private ServerContext context;
private Credentials credentials;
- private Opts opts;
private SimpleGarbageCollector gc;
private ConfigurationCopy systemConfig;
private static SiteConfiguration siteConfig = new SiteConfiguration();
@@ -63,7 +59,6 @@ public class SimpleGarbageCollectorTest {
expect(context.getZooKeepers()).andReturn("localhost").anyTimes();
expect(context.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
- opts = new Opts();
systemConfig = createSystemConfig();
expect(context.getConfiguration()).andReturn(systemConfig).anyTimes();
expect(context.getVolumeManager()).andReturn(volMgr).anyTimes();
@@ -75,13 +70,10 @@ public class SimpleGarbageCollectorTest {
replay(context);
- gc = new SimpleGarbageCollector(opts, context);
- }
-
- @Test
- public void testConstruction() {
- assertSame(opts, gc.getOpts());
- assertNotNull(gc.getStatus(createMock(TInfo.class), createMock(TCredentials.class)));
+ gc = partialMockBuilder(SimpleGarbageCollector.class).addMockedMethod("getContext")
+ .createMock();
+ expect(gc.getContext()).andReturn(context).anyTimes();
+ replay(gc);
}
private ConfigurationCopy createSystemConfig() {
@@ -97,7 +89,7 @@ public class SimpleGarbageCollectorTest {
@Test
public void testInit() {
- assertSame(volMgr, gc.getVolumeManager());
+ assertSame(volMgr, gc.getContext().getVolumeManager());
assertEquals(credentials, gc.getContext().getCredentials());
assertTrue(gc.isUsingTrash());
assertEquals(1000L, gc.getStartDelay());
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9e8f95b..3577530 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -96,6 +96,7 @@ import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
import org.apache.accumulo.master.replication.ReplicationDriver;
import org.apache.accumulo.master.replication.WorkDriver;
import org.apache.accumulo.master.state.TableCounts;
+import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.HighlyAvailableService;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
@@ -175,7 +176,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* <p>
* The master will also coordinate log recoveries and reports general status.
*/
-public class Master
+public class Master extends AbstractServer
implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
static final Logger log = LoggerFactory.getLogger(Master.class);
@@ -191,7 +192,6 @@ public class Master
private static final int MAX_BAD_STATUS_COUNT = 3;
final VolumeManager fs;
- private final String hostname;
private final Object balancedNotifier = new Object();
final LiveTServerSet tserverSet;
private final List<TabletGroupWatcher> watchers = new ArrayList<>();
@@ -279,6 +279,7 @@ public class Master
}
private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
+ ServerContext context = getContext();
String dirZPath = getZooKeeperRoot() + RootTable.ZROOT_TABLET_PATH;
if (!zoo.exists(dirZPath)) {
@@ -324,6 +325,8 @@ public class Master
@SuppressFBWarnings(value = "DM_EXIT",
justification = "TODO probably not the best to call System.exit here")
private void upgradeZookeeper() {
+ ServerContext context = getContext();
+
// 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the
// monitor and before starting up. It's not tied to the data version at all (and would
// introduce unnecessary complexity to try to make the master do it), but be aware
@@ -339,7 +342,7 @@ public class Master
+ " initialized prior to the Master transitioning to active. Please"
+ " save all logs and file a bug.");
}
- ServerUtil.abortIfFateTransactions(getContext());
+ ServerUtil.abortIfFateTransactions(context);
try {
log.info("Upgrading zookeeper");
@@ -461,7 +464,7 @@ public class Master
// that uses the ZKPermHandler for permissions storage so long
// as the PermHandler only overrides the user name, and we don't care what the user name is.
ZKPermHandler perm = new ZKPermHandler();
- perm.initialize(getContext(), true);
+ perm.initialize(context, true);
String users = getZooKeeperRoot() + "/users";
for (String user : zoo.getChildren(users)) {
zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0],
@@ -498,7 +501,6 @@ public class Master
private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1);
- private final ServerContext context;
private final ServerConfigurationFactory serverConfig;
private MasterClientServiceHandler clientHandler;
@@ -529,6 +531,7 @@ public class Master
justification = "TODO probably not the best to call System.exit here")
@Override
public void run() {
+ ServerContext context = getContext();
try {
log.info("Starting to upgrade metadata table.");
if (version == ServerConstants.MOVE_DELETE_MARKERS - 1) {
@@ -611,7 +614,7 @@ public class Master
case NORMAL:
// Count offline tablets for online tables
for (TabletGroupWatcher watcher : watchers) {
- TableManager manager = context.getTableManager();
+ TableManager manager = getContext().getTableManager();
for (Entry<TableId,TableCounts> entry : watcher.getStats().entrySet()) {
TableId tableId = entry.getKey();
TableCounts counts = entry.getValue();
@@ -644,6 +647,7 @@ public class Master
}
public void mustBeOnline(final TableId tableId) throws ThriftTableOperationException {
+ ServerContext context = getContext();
Tables.clearCache(context);
if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE)) {
throw new ThriftTableOperationException(tableId.canonical(), null, TableOperation.MERGE,
@@ -651,19 +655,21 @@ public class Master
}
}
- public ServerContext getContext() {
- return context;
+ public TableManager getTableManager() {
+ return getContext().getTableManager();
}
- public TableManager getTableManager() {
- return context.getTableManager();
+ public static void main(String[] args) throws Exception {
+ try (Master master = new Master(new ServerOpts(), args)) {
+ master.runServer();
+ }
}
- public Master(ServerContext context) throws IOException {
- this.context = context;
+ Master(ServerOpts opts, String[] args) throws IOException {
+ super("master", opts, args);
+ ServerContext context = super.getContext();
this.serverConfig = context.getServerConfFactory();
this.fs = context.getVolumeManager();
- this.hostname = context.getHostname();
AccumuloConfiguration aconf = serverConfig.getSystemConfiguration();
@@ -677,18 +683,14 @@ public class Master
Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
this.tabletBalancer.init(context);
- try {
- AccumuloVFSClassLoader.getContextManager()
- .setContextConfig(new ContextManager.DefaultContextsConfig() {
- @Override
- public Map<String,String> getVfsContextClasspathProperties() {
- return getConfiguration()
- .getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY);
- }
- });
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ AccumuloVFSClassLoader.getContextManager()
+ .setContextConfig(new ContextManager.DefaultContextsConfig() {
+ @Override
+ public Map<String,String> getVfsContextClasspathProperties() {
+ return getConfiguration()
+ .getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY);
+ }
+ });
this.security = AuditedSecurityOperation.getInstance(context);
@@ -717,15 +719,11 @@ public class Master
}
public String getInstanceID() {
- return context.getInstanceID();
+ return getContext().getInstanceID();
}
public String getZooKeeperRoot() {
- return context.getZooKeeperRoot();
- }
-
- public AccumuloConfiguration getConfiguration() {
- return context.getConfiguration();
+ return getContext().getZooKeeperRoot();
}
public TServerConnection getConnection(TServerInstance server) {
@@ -733,6 +731,7 @@ public class Master
}
public MergeInfo getMergeInfo(TableId tableId) {
+ ServerContext context = getContext();
synchronized (mergeLock) {
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
@@ -757,6 +756,7 @@ public class Master
public void setMergeState(MergeInfo info, MergeState state)
throws KeeperException, InterruptedException {
+ ServerContext context = getContext();
synchronized (mergeLock) {
String path =
getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().getTableId() + "/merge";
@@ -782,7 +782,7 @@ public class Master
public void clearMergeState(TableId tableId) throws KeeperException, InterruptedException {
synchronized (mergeLock) {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
- context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
+ getContext().getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
mergeLock.notifyAll();
}
nextEvent.event("Merge state of %s cleared", tableId);
@@ -790,7 +790,7 @@ public class Master
void setMasterGoalState(MasterGoalState state) {
try {
- context.getZooReaderWriter().putPersistentData(
+ getContext().getZooReaderWriter().putPersistentData(
getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
NodeExistsPolicy.OVERWRITE);
} catch (Exception ex) {
@@ -801,7 +801,7 @@ public class Master
MasterGoalState getMasterGoalState() {
while (true) {
try {
- byte[] data = context.getZooReaderWriter()
+ byte[] data = getContext().getZooReaderWriter()
.getData(getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, null);
return MasterGoalState.valueOf(new String(data));
} catch (Exception e) {
@@ -871,7 +871,7 @@ public class Master
}
TabletGoalState getTableGoalState(KeyExtent extent) {
- TableState tableState = context.getTableManager().getTableState(extent.getTableId());
+ TableState tableState = getContext().getTableManager().getTableState(extent.getTableId());
if (tableState == null) {
return TabletGoalState.DELETED;
}
@@ -946,7 +946,7 @@ public class Master
if (!migrations.isEmpty()) {
try {
cleanupOfflineMigrations();
- cleanupNonexistentMigrations(context);
+ cleanupNonexistentMigrations(getContext());
} catch (Exception ex) {
log.error("Error cleaning up migrations", ex);
}
@@ -979,6 +979,7 @@ public class Master
* tablet server will load the tablet. check for offline tables and remove their migrations.
*/
private void cleanupOfflineMigrations() {
+ ServerContext context = getContext();
TableManager manager = context.getTableManager();
for (TableId tableId : Tables.getIdToNameMap(context).keySet()) {
TableState state = manager.getTableState(tableId);
@@ -1258,7 +1259,9 @@ public class Master
return info;
}
- public void run() throws IOException, InterruptedException, KeeperException {
+ @Override
+ public void run() {
+ final ServerContext context = getContext();
final String zroot = getZooKeeperRoot();
// ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
@@ -1277,14 +1280,23 @@ public class Master
} else {
processor = new Processor<>(rpcProxy);
}
- ServerAddress sa = TServerUtils.startServer(context, hostname, Property.MASTER_CLIENTPORT,
- processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS,
- Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress sa;
+ try {
+ sa = TServerUtils.startServer(context, getHostname(), Property.MASTER_CLIENTPORT, processor,
+ "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS,
+ Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
+ }
clientService = sa.server;
log.info("Started Master client service at {}", sa.address);
// block until we can obtain the ZK lock for the master
- getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ try {
+ getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception getting master lock", e);
+ }
recoveryManager = new RecoveryManager(this);
@@ -1300,18 +1312,22 @@ public class Master
ZooReaderWriter zReaderWriter = context.getZooReaderWriter();
- zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- nextEvent.event("Noticed recovery changes %s", event.getType());
- try {
- // watcher only fires once, add it back
- zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, this);
- } catch (Exception e) {
- log.error("Failed to add log recovery watcher back", e);
+ try {
+ zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ nextEvent.event("Noticed recovery changes %s", event.getType());
+ try {
+ // watcher only fires once, add it back
+ zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, this);
+ } catch (Exception e) {
+ log.error("Failed to add log recovery watcher back", e);
+ }
}
- }
- });
+ });
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Unable to read " + zroot + Constants.ZRECOVERY, e);
+ }
watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(context, this), null) {
@Override
@@ -1346,7 +1362,11 @@ public class Master
}
// Once we are sure the upgrade is complete, we can safely allow fate use.
- waitForMetadataUpgrade.await();
+ try {
+ waitForMetadataUpgrade.await();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Metadata upgrade interrupted", e);
+ }
try {
final AgeOffStore<Master> store = new AgeOffStore<>(new org.apache.accumulo.fate.ZooStore<>(
@@ -1359,16 +1379,24 @@ public class Master
SimpleTimer.getInstance(getConfiguration()).schedule(() -> store.ageOff(), 63000, 63000);
} catch (KeeperException | InterruptedException e) {
- throw new IOException(e);
+ throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
}
- ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
+ try {
+ ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception while ensuring ZooKeeper is initialized", e);
+ }
// Make sure that we have a secret key (either a new one or an old one from ZK) before we start
// the master client service.
if (authenticationTokenKeyManager != null && keyDistributor != null) {
log.info("Starting delegation-token key manager");
- keyDistributor.initialize();
+ try {
+ keyDistributor.initialize();
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception setting up delegation-token key manager", e);
+ }
authenticationTokenKeyManager.start();
boolean logged = false;
while (!authenticationTokenKeyManager.isInitialized()) {
@@ -1385,7 +1413,11 @@ public class Master
String address = sa.address.toString();
log.info("Setting master lock data to {}", address);
- masterLock.replaceLockData(address.getBytes());
+ try {
+ masterLock.replaceLockData(address.getBytes());
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Exception updating master lock", e);
+ }
while (!clientService.isServing()) {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
@@ -1430,42 +1462,55 @@ public class Master
timeKeeper.shutdown();
final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
- statusThread.join(remaining(deadline));
- if (replicationWorkAssigner != null) {
- replicationWorkAssigner.join(remaining(deadline));
- }
- if (replicationWorkDriver != null) {
- replicationWorkDriver.join(remaining(deadline));
+ try {
+ statusThread.join(remaining(deadline));
+ if (replicationWorkAssigner != null) {
+ replicationWorkAssigner.join(remaining(deadline));
+ }
+ if (replicationWorkDriver != null) {
+ replicationWorkDriver.join(remaining(deadline));
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Exception starting replication workers", e);
}
TServerUtils.stopTServer(replServer.get());
// Signal that we want it to stop, and wait for it to do so.
if (authenticationTokenKeyManager != null) {
authenticationTokenKeyManager.gracefulStop();
- authenticationTokenKeyManager.join(remaining(deadline));
+ try {
+ authenticationTokenKeyManager.join(remaining(deadline));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Exception waiting on delegation-token key manager", e);
+ }
}
// quit, even if the tablet servers somehow jam up and the watchers
// don't stop
for (TabletGroupWatcher watcher : watchers) {
- watcher.join(remaining(deadline));
+ try {
+ watcher.join(remaining(deadline));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Exception waiting on watcher", e);
+ }
}
log.info("exiting");
}
private TServer setupReplication()
throws UnknownHostException, KeeperException, InterruptedException {
+ ServerContext context = getContext();
// Start the replication coordinator which assigns tservers to service replication requests
MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
ReplicationCoordinator.Iface haReplicationProxy =
HighlyAvailableServiceWrapper.service(impl, this);
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor =
new ReplicationCoordinator.Processor<>(TraceUtil.wrapService(haReplicationProxy));
- ServerAddress replAddress =
- TServerUtils.startServer(context, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT,
- replicationCoordinatorProcessor, "Master Replication Coordinator",
- "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
- Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress replAddress = TServerUtils.startServer(context, getHostname(),
+ Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
+ "Master Replication Coordinator", "Replication Coordinator", null,
+ Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+ Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
log.info("Started replication coordinator service at " + replAddress.address);
// Start the daemon to scan the replication table and make units of work
@@ -1549,15 +1594,16 @@ public class Master
}
private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException {
+ ServerContext context = getContext();
log.info("trying to get master lock");
final String masterClientAddress =
- hostname + ":" + getConfiguration().getPort(Property.MASTER_CLIENTPORT)[0];
+ getHostname() + ":" + getConfiguration().getPort(Property.MASTER_CLIENTPORT)[0];
while (true) {
MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
- masterLock = new ZooLock(getContext().getZooReaderWriter(), zMasterLoc);
+ masterLock = new ZooLock(context.getZooReaderWriter(), zMasterLoc);
masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
masterLockWatcher.waitForChange();
@@ -1578,27 +1624,13 @@ public class Master
setMasterState(MasterState.HAVE_LOCK);
}
- public static void main(String[] args) throws Exception {
- final String app = "master";
- ServerOpts opts = new ServerOpts();
- opts.parseArgs(app, args);
- ServerContext context = new ServerContext(opts.getSiteConfiguration());
- context.setupServer(app, Master.class.getName(), opts.getAddress());
- try {
- Master master = new Master(context);
- master.run();
- } finally {
- context.teardownServer();
- }
- }
-
@Override
public void update(LiveTServerSet current, Set<TServerInstance> deleted,
Set<TServerInstance> added) {
// if we have deleted or added tservers, then adjust our dead server list
if (!deleted.isEmpty() || !added.isEmpty()) {
DeadServerList obit =
- new DeadServerList(context, getZooKeeperRoot() + Constants.ZDEADTSERVERS);
+ new DeadServerList(getContext(), getZooKeeperRoot() + Constants.ZDEADTSERVERS);
if (added.size() > 0) {
log.info("New servers: {}", added);
for (TServerInstance up : added) {
@@ -1697,6 +1729,7 @@ public class Master
}
return result;
}
+ ServerContext context = getContext();
TableManager manager = context.getTableManager();
for (TableId tableId : Tables.getIdToNameMap(context).keySet()) {
@@ -1718,7 +1751,7 @@ public class Master
@Override
public Collection<MergeInfo> merges() {
List<MergeInfo> result = new ArrayList<>();
- for (TableId tableId : Tables.getIdToNameMap(context).keySet()) {
+ for (TableId tableId : Tables.getIdToNameMap(getContext()).keySet()) {
result.add(getMergeInfo(tableId));
}
return result;
@@ -1799,7 +1832,8 @@ public class Master
result.serversShuttingDown.add(server.hostPort());
}
}
- DeadServerList obit = new DeadServerList(context, getZooKeeperRoot() + Constants.ZDEADTSERVERS);
+ DeadServerList obit =
+ new DeadServerList(getContext(), getZooKeeperRoot() + Constants.ZDEADTSERVERS);
result.deadTabletServers = obit.getList();
result.bulkImports = bulkImportStatus.getBulkLoadStatus();
return result;
@@ -1830,7 +1864,7 @@ public class Master
public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> logsForDeadServers)
throws WalMarkerException {
- WalStateManager mgr = new WalStateManager(context);
+ WalStateManager mgr = new WalStateManager(getContext());
for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
for (Path path : server.getValue()) {
mgr.closeWal(server.getKey(), path);
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
index b40ee7d..bac8108 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java
@@ -43,7 +43,7 @@ public class EmbeddedWebServer {
server = new Server();
final AccumuloConfiguration conf = monitor.getContext().getConfiguration();
connector = new ServerConnector(server, getConnectionFactories(conf));
- connector.setHost(monitor.getContext().getHostname());
+ connector.setHost(monitor.getHostname());
connector.setPort(port);
handler =
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index a3003b0..929d1c0 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -63,6 +63,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.HighlyAvailableService;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
@@ -89,16 +90,21 @@ import org.slf4j.LoggerFactory;
/**
* Serve master statistics with an embedded web server.
*/
-public class Monitor implements Runnable, HighlyAvailableService {
+public class Monitor extends AbstractServer implements HighlyAvailableService {
private static final Logger log = LoggerFactory.getLogger(Monitor.class);
private static final int REFRESH_TIME = 5;
- private final ServerContext context;
private final long START_TIME;
- Monitor(ServerContext context) {
- this.context = context;
+ public static void main(String[] args) throws Exception {
+ try (Monitor monitor = new Monitor(new ServerOpts(), args)) {
+ monitor.runServer();
+ }
+ }
+
+ Monitor(ServerOpts opts, String[] args) {
+ super("monitor", opts, args);
START_TIME = System.currentTimeMillis();
}
@@ -216,6 +222,7 @@ public class Monitor implements Runnable, HighlyAvailableService {
}
public void fetchData() {
+ ServerContext context = getContext();
double totalIngestRate = 0.;
double totalIngestByteRate = 0.;
double totalQueryRate = 0.;
@@ -374,6 +381,7 @@ public class Monitor implements Runnable, HighlyAvailableService {
}
private GCStatus fetchGcStatus() {
+ ServerContext context = getContext();
GCStatus result = null;
HostAndPort address = null;
try {
@@ -399,23 +407,10 @@ public class Monitor implements Runnable, HighlyAvailableService {
return result;
}
- public static void main(String[] args) {
- final String app = "monitor";
- ServerOpts opts = new ServerOpts();
- opts.parseArgs(app, args);
- ServerContext context = new ServerContext(opts.getSiteConfiguration());
- context.setupServer(app, Monitor.class.getName(), opts.getAddress());
- try {
- Monitor monitor = new Monitor(context);
- monitor.run();
- } finally {
- context.teardownServer();
- }
- }
-
@Override
public void run() {
- int[] ports = context.getConfiguration().getPort(Property.MONITOR_PORT);
+ ServerContext context = getContext();
+ int[] ports = getConfiguration().getPort(Property.MONITOR_PORT);
for (int port : ports) {
try {
log.debug("Creating monitor on port {}", port);
@@ -441,7 +436,7 @@ public class Monitor implements Runnable, HighlyAvailableService {
throw new RuntimeException(e);
}
- String advertiseHost = context.getHostname();
+ String advertiseHost = getHostname();
if (advertiseHost.equals("0.0.0.0")) {
try {
advertiseHost = InetAddress.getLocalHost().getHostName();
@@ -572,6 +567,7 @@ public class Monitor implements Runnable, HighlyAvailableService {
}
private void fetchScans() throws Exception {
+ ServerContext context = getContext();
for (String server : context.instanceOperations().getTabletServers()) {
final HostAndPort parsedServer = HostAndPort.fromString(server);
Client tserver = ThriftUtil.getTServerClient(parsedServer, context);
@@ -601,6 +597,7 @@ public class Monitor implements Runnable, HighlyAvailableService {
* Get the monitor lock in ZooKeeper
*/
private void getMonitorLock() throws KeeperException, InterruptedException {
+ ServerContext context = getContext();
final String zRoot = context.getZooKeeperRoot();
final String monitorPath = zRoot + Constants.ZMONITOR;
final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK;
@@ -808,10 +805,6 @@ public class Monitor implements Runnable, HighlyAvailableService {
return new ArrayList<>(dataCacheHitRateOverTime);
}
- public ServerContext getContext() {
- return context;
- }
-
@Override
public boolean isActiveService() {
return monitorInitialized.get();
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/MonitorExecutable.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/MonitorExecutable.java
index 65c0a31..ddd34c5 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/MonitorExecutable.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/MonitorExecutable.java
@@ -39,7 +39,7 @@ public class MonitorExecutable implements KeywordExecutable {
}
@Override
- public void execute(final String[] args) {
+ public void execute(final String[] args) throws Exception {
Monitor.main(args);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 5a54bc6..124a019 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@@ -170,6 +169,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
@@ -260,7 +260,6 @@ import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.thrift.TException;
@@ -275,7 +274,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
-public class TabletServer implements Runnable {
+public class TabletServer extends AbstractServer {
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
@@ -310,7 +309,6 @@ public class TabletServer implements Runnable {
private final AtomicLong flushCounter = new AtomicLong(0);
private final AtomicLong syncCounter = new AtomicLong(0);
- private final ServerContext context;
private final VolumeManager fs;
private final OnlineTablets onlineTablets = new OnlineTablets();
@@ -351,8 +349,16 @@ public class TabletServer implements Runnable {
private final ZooAuthenticationKeyWatcher authKeyWatcher;
private final WalStateManager walMarker;
- public TabletServer(ServerContext context) {
- this.context = context;
+ public static void main(String[] args) throws Exception {
+ try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) {
+ tserver.runServer();
+ }
+ }
+
+ TabletServer(ServerOpts opts, String[] args) {
+ super("tserver", opts, args);
+ ServerContext context = super.getContext();
+ context.setupCrypto();
this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null);
this.watcher = new TransactionWatcher(context);
this.confFactory = context.getServerConfFactory();
@@ -366,23 +372,25 @@ public class TabletServer implements Runnable {
this.statsKeeper = new TabletStatsKeeper();
SimpleTimer.getInstance(aconf).schedule(() -> {
long now = System.currentTimeMillis();
- for (Tablet tablet : getOnlineTablets().values())
+ for (Tablet tablet : getOnlineTablets().values()) {
try {
tablet.updateRates(now);
} catch (Exception ex) {
log.error("Error updating rates for {}", tablet.getExtent(), ex);
}
+ }
}, 5000, 5000);
final long walogMaxSize = aconf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE);
final long walogMaxAge = aconf.getTimeInMillis(Property.TSERV_WALOG_MAX_AGE);
final long minBlockSize =
context.getHadoopConf().getLong("dfs.namenode.fs-limits.min-block-size", 0);
- if (minBlockSize != 0 && minBlockSize > walogMaxSize)
+ if (minBlockSize != 0 && minBlockSize > walogMaxSize) {
throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize "
+ walogMaxSize + " but hdfs minimum block size is " + minBlockSize
+ ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE
+ " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
+ }
final long toleratedWalCreationFailures =
aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
@@ -432,16 +440,8 @@ public class TabletServer implements Runnable {
config();
}
- public ServerContext getContext() {
- return context;
- }
-
public String getInstanceID() {
- return context.getInstanceID();
- }
-
- public AccumuloConfiguration getConfiguration() {
- return context.getConfiguration();
+ return getContext().getInstanceID();
}
public String getVersion() {
@@ -469,7 +469,7 @@ public class TabletServer implements Runnable {
implements TabletClientService.Iface {
ThriftClientHandler() {
- super(context, watcher, fs);
+ super(getContext(), watcher, fs);
log.debug("{} created", ThriftClientHandler.class.getName());
}
@@ -478,9 +478,10 @@ public class TabletServer implements Runnable {
final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
throws ThriftSecurityException {
- if (!security.canPerformSystemActions(credentials))
+ if (!security.canPerformSystemActions(credentials)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
try {
return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
@@ -524,9 +525,10 @@ public class TabletServer implements Runnable {
public void loadFiles(TInfo tinfo, TCredentials credentials, long tid, String dir,
Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
throws ThriftSecurityException {
- if (!security.canPerformSystemActions(credentials))
+ if (!security.canPerformSystemActions(credentials)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
tabletImports.forEach((tke, fileMap) -> {
@@ -559,7 +561,7 @@ public class TabletServer implements Runnable {
return null;
}
- return context.getServerConfFactory().getTableConfiguration(extent.getTableId())
+ return getContext().getServerConfFactory().getTableConfiguration(extent.getTableId())
.getScanDispatcher();
}
@@ -576,18 +578,20 @@ public class TabletServer implements Runnable {
TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
NamespaceId namespaceId;
try {
- namespaceId = Tables.getNamespaceId(context, tableId);
+ namespaceId = Tables.getNamespaceId(getContext(), tableId);
} catch (TableNotFoundException e1) {
throw new NotServingTabletException(textent);
}
if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
- authorizations))
+ authorizations)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
- if (!security.authenticatedUserHasAuthorizations(credentials, authorizations))
+ if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
final KeyExtent extent = new KeyExtent(textent);
@@ -601,12 +605,14 @@ public class TabletServer implements Runnable {
// the restarted client may not see the write unless we wait here.
// this behavior is very important when the client is reading the
// metadata
- if (waitForWrites)
+ if (waitForWrites) {
writeTracker.waitForWrites(TabletType.type(extent));
+ }
Tablet tablet = getOnlineTablet(extent);
- if (tablet == null)
+ if (tablet == null) {
throw new NotServingTabletException(textent);
+ }
HashSet<Column> columnSet = new HashSet<>();
for (TColumn tcolumn : columns) {
@@ -672,14 +678,14 @@ public class TabletServer implements Runnable {
scanSession.nextBatchTask = null;
} catch (ExecutionException e) {
sessionManager.removeSession(scanID);
- if (e.getCause() instanceof NotServingTabletException)
+ if (e.getCause() instanceof NotServingTabletException) {
throw (NotServingTabletException) e.getCause();
- else if (e.getCause() instanceof TooManyFilesException)
+ } else if (e.getCause() instanceof TooManyFilesException) {
throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
scanSession.extent.toThrift());
- else if (e.getCause() instanceof SampleNotPresentException)
+ } else if (e.getCause() instanceof SampleNotPresentException) {
throw new TSampleNotPresentException(scanSession.extent.toThrift());
- else if (e.getCause() instanceof IOException) {
+ } else if (e.getCause() instanceof IOException) {
sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
List<KVEntry> empty = Collections.emptyList();
bresult = new ScanBatch(empty, true);
@@ -690,10 +696,11 @@ public class TabletServer implements Runnable {
} catch (CancellationException ce) {
sessionManager.removeSession(scanID);
Tablet tablet = getOnlineTablet(scanSession.extent);
- if (tablet == null || tablet.isClosed())
+ if (tablet == null || tablet.isClosed()) {
throw new NotServingTabletException(scanSession.extent.toThrift());
- else
+ } else {
throw new NoSuchScanIDException();
+ }
} catch (TimeoutException e) {
List<TKeyValue> param = Collections.emptyList();
long timeout =
@@ -721,8 +728,9 @@ public class TabletServer implements Runnable {
scanSession, scanSession.nextBatchTask);
}
- if (!scanResult.more)
+ if (!scanResult.more) {
closeScan(tinfo, scanID);
+ }
return scanResult;
}
@@ -759,22 +767,25 @@ public class TabletServer implements Runnable {
tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
}
- if (tables.size() != 1)
+ if (tables.size() != 1) {
throw new IllegalArgumentException("Cannot batch scan over multiple tables");
+ }
// check if user has permission to the tables
for (TableId tableId : tables) {
NamespaceId namespaceId = getNamespaceId(credentials, tableId);
if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
- authorizations))
+ authorizations)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
}
try {
- if (!security.authenticatedUserHasAuthorizations(credentials, authorizations))
+ if (!security.authenticatedUserHasAuthorizations(credentials, authorizations)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
} catch (ThriftSecurityException tse) {
log.error("{} is not authorized", credentials.getPrincipal(), tse);
throw tse;
@@ -785,8 +796,9 @@ public class TabletServer implements Runnable {
// This is used to determine which thread pool to use
KeyExtent threadPoolExtent = batch.keySet().iterator().next();
- if (waitForWrites)
+ if (waitForWrites) {
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+ }
final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch,
ssiList, ssio, new Authorizations(authorizations),
@@ -798,8 +810,9 @@ public class TabletServer implements Runnable {
mss.numRanges += ranges.size();
}
- for (TColumn tcolumn : tcolumns)
+ for (TColumn tcolumn : tcolumns) {
mss.columnSet.add(new Column(tcolumn));
+ }
long sid = sessionManager.createSession(mss, true);
@@ -891,18 +904,20 @@ public class TabletServer implements Runnable {
// Make sure user is real
Durability durability = DurabilityImpl.fromThrift(tdurabilty);
security.authenticateUser(credentials, credentials);
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
+ }
- UpdateSession us = new UpdateSession(new TservConstraintEnv(context, security, credentials),
- credentials, durability);
+ UpdateSession us = new UpdateSession(
+ new TservConstraintEnv(getContext(), security, credentials), credentials, durability);
return sessionManager.createSession(us, false);
}
private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
long t1 = System.currentTimeMillis();
- if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
+ if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) {
return;
+ }
if (us.currentTablet == null
&& (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
// if there were previous failures, then do not accept additional writes
@@ -917,7 +932,7 @@ public class TabletServer implements Runnable {
&& (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
tableId = keyExtent.getTableId();
if (sameTable || security.canWrite(us.getCredentials(), tableId,
- Tables.getNamespaceId(context, tableId))) {
+ Tables.getNamespaceId(getContext(), tableId))) {
long t2 = System.currentTimeMillis();
us.authTimes.addStat(t2 - t1);
us.currentTablet = getOnlineTablet(keyExtent);
@@ -927,8 +942,9 @@ public class TabletServer implements Runnable {
// not serving tablet, so report all mutations as
// failures
us.failures.put(keyExtent, 0L);
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.UNKNOWN_TABLET_ERRORS, 0);
+ }
}
} else {
log.warn("Denying access to table {} for user {}", keyExtent.getTableId(), us.getUser());
@@ -936,8 +952,9 @@ public class TabletServer implements Runnable {
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
+ }
return;
}
} catch (TableNotFoundException tnfe) {
@@ -946,8 +963,9 @@ public class TabletServer implements Runnable {
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.UNKNOWN_TABLET_ERRORS, 0);
+ }
return;
} catch (ThriftSecurityException e) {
log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
@@ -956,8 +974,9 @@ public class TabletServer implements Runnable {
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, e.getCode());
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
+ }
return;
}
}
@@ -1017,12 +1036,15 @@ public class TabletServer implements Runnable {
long pt1 = System.currentTimeMillis();
boolean containsMetadataTablet = false;
- for (Tablet tablet : us.queuedMutations.keySet())
- if (tablet.getExtent().isMeta())
+ for (Tablet tablet : us.queuedMutations.keySet()) {
+ if (tablet.getExtent().isMeta()) {
containsMetadataTablet = true;
+ }
+ }
- if (!containsMetadataTablet && us.queuedMutations.size() > 0)
+ if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+ }
try (TraceScope prep = Trace.startSpan("prep")) {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
@@ -1033,8 +1055,9 @@ public class TabletServer implements Runnable {
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.MUTATION_ARRAY_SIZE, mutations.size());
+ }
CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (commitSession == null) {
@@ -1053,8 +1076,9 @@ public class TabletServer implements Runnable {
} catch (TConstraintViolationException e) {
us.violations.add(e.getViolations());
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.CONSTRAINT_VIOLATIONS, 0);
+ }
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
@@ -1142,18 +1166,21 @@ public class TabletServer implements Runnable {
}
private void updateWalogWriteTime(long time) {
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.WALOG_WRITE_TIME, time);
+ }
}
private void updateAvgCommitTime(long time, int size) {
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.COMMIT_TIME, (long) ((time) / (double) size));
+ }
}
private void updateAvgPrepTime(long time, int size) {
- if (updateMetrics.isEnabled())
+ if (updateMetrics.isEnabled()) {
updateMetrics.add(TabletServerUpdateMetrics.COMMIT_PREP, (long) ((time) / (double) size));
+ }
}
@Override
@@ -1216,9 +1243,10 @@ public class TabletServer implements Runnable {
final TableId tableId = TableId.of(new String(tkeyExtent.getTable(), UTF_8));
NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canWrite(credentials, tableId, namespaceId))
+ if (!security.canWrite(credentials, tableId, namespaceId)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
final Tablet tablet = getOnlineTablet(new KeyExtent(keyExtent));
if (tablet == null) {
@@ -1246,7 +1274,7 @@ public class TabletServer implements Runnable {
CommitSession cs;
try (TraceScope prep = Trace.startSpan("prep")) {
cs = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(context, security, credentials), mutations);
+ new TservConstraintEnv(getContext(), security, credentials), mutations);
}
if (cs == null) {
throw new NotServingTabletException(tkeyExtent);
@@ -1280,7 +1308,7 @@ public class TabletServer implements Runnable {
private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
throws ThriftSecurityException {
try {
- return Tables.getNamespaceId(context, tableId);
+ return Tables.getNamespaceId(getContext(), tableId);
} catch (TableNotFoundException e1) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.TABLE_DOESNT_EXIST);
@@ -1294,16 +1322,17 @@ public class TabletServer implements Runnable {
updates.entrySet().iterator();
final CompressedIterators compressedIters = new CompressedIterators(symbols);
- ConditionCheckerContext checkerContext = new ConditionCheckerContext(context, compressedIters,
- confFactory.getTableConfiguration(cs.tableId));
+ ConditionCheckerContext checkerContext = new ConditionCheckerContext(getContext(),
+ compressedIters, confFactory.getTableConfiguration(cs.tableId));
while (iter.hasNext()) {
final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
final Tablet tablet = getOnlineTablet(entry.getKey());
if (tablet == null || tablet.isClosed()) {
- for (ServerConditionalMutation scm : entry.getValue())
+ for (ServerConditionalMutation scm : entry.getValue()) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
iter.remove();
} else {
final List<ServerConditionalMutation> okMutations =
@@ -1348,8 +1377,9 @@ public class TabletServer implements Runnable {
for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
final Tablet tablet = getOnlineTablet(entry.getKey());
if (tablet == null || tablet.isClosed() || sessionCanceled) {
- for (ServerConditionalMutation scm : entry.getValue())
+ for (ServerConditionalMutation scm : entry.getValue()) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
} else {
final Durability durability =
DurabilityImpl.resolveDurabilty(sess.durability, tablet.getDurability());
@@ -1361,14 +1391,16 @@ public class TabletServer implements Runnable {
if (mutations.size() > 0) {
CommitSession cs = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(context, security, sess.credentials), mutations);
+ new TservConstraintEnv(getContext(), security, sess.credentials), mutations);
if (cs == null) {
- for (ServerConditionalMutation scm : entry.getValue())
+ for (ServerConditionalMutation scm : entry.getValue()) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+ }
} else {
- for (ServerConditionalMutation scm : entry.getValue())
+ for (ServerConditionalMutation scm : entry.getValue()) {
results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+ }
if (durability != Durability.NONE) {
loggables.put(cs, new TabletMutations(cs, mutations, durability));
}
@@ -1382,14 +1414,16 @@ public class TabletServer implements Runnable {
loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability));
}
sendables.put(cs, e.getNonViolators());
- for (Mutation m : e.getNonViolators())
+ for (Mutation m : e.getNonViolators()) {
results.add(
new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+ }
}
- for (Mutation m : e.getViolators())
+ for (Mutation m : e.getViolators()) {
results.add(
new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+ }
}
}
}
@@ -1462,15 +1496,18 @@ public class TabletServer implements Runnable {
TableId tableId = TableId.of(tableIdStr);
Authorizations userauths = null;
NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canConditionallyUpdate(credentials, tableId, namespaceId))
+ if (!security.canConditionallyUpdate(credentials, tableId, namespaceId)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
userauths = security.getUserAuthorizations(credentials);
- for (ByteBuffer auth : authorizations)
- if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+ for (ByteBuffer auth : authorizations) {
+ if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
+ }
+ }
ConditionalSession cs = new ConditionalSession(credentials,
new Authorizations(authorizations), tableId, DurabilityImpl.fromThrift(tdurabilty));
@@ -1486,8 +1523,9 @@ public class TabletServer implements Runnable {
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
- if (cs == null || cs.interruptFlag.get())
+ if (cs == null || cs.interruptFlag.get()) {
throw new NoSuchScanIDException();
+ }
if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
try {
@@ -1507,10 +1545,12 @@ public class TabletServer implements Runnable {
Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
- for (KeyExtent ke : updates.keySet())
- if (!ke.getTableId().equals(tid))
+ for (KeyExtent ke : updates.keySet()) {
+ if (!ke.getTableId().equals(tid)) {
throw new IllegalArgumentException(
"Unexpected table id " + tid + " != " + ke.getTableId());
+ }
+ }
ArrayList<TCMResult> results = new ArrayList<>();
@@ -1536,12 +1576,14 @@ public class TabletServer implements Runnable {
// after this method returns a conditional update should not be able to start
ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
- if (cs != null)
+ if (cs != null) {
cs.interruptFlag.set(true);
+ }
cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
- if (cs != null)
+ if (cs != null) {
sessionManager.removeSession(sessID, true);
+ }
}
@Override
@@ -1556,9 +1598,10 @@ public class TabletServer implements Runnable {
TableId tableId = TableId.of(new String(ByteBufferUtil.toBytes(tkeyExtent.table)));
NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canSplitTablet(credentials, tableId, namespaceId))
+ if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
+ }
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
@@ -1617,7 +1660,7 @@ public class TabletServer implements Runnable {
}
} catch (ThriftSecurityException e) {
log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
- if (context.getCredentials().getToken().getClass().getName()
+ if (getContext().getCredentials().getToken().getClass().getName()
.equals(credentials.getTokenClassName())) {
log.error("Got message from a service with a mismatched configuration."
+ " Please ensure a compatible configuration.", e);
@@ -1641,7 +1684,7 @@ public class TabletServer implements Runnable {
if (lock != null) {
ZooUtil.LockID lid =
- new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
+ new ZooUtil.LockID(getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
@@ -1778,8 +1821,9 @@ public class TabletServer implements Runnable {
ByteBufferUtil.toText(startRow));
for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent()))
+ if (ke.overlaps(tablet.getExtent())) {
tabletsToFlush.add(tablet);
+ }
}
Long flushID = null;
@@ -1900,8 +1944,9 @@ public class TabletServer implements Runnable {
ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent()))
+ if (ke.overlaps(tablet.getExtent())) {
tabletsToCompact.add(tablet);
+ }
}
Pair<Long,UserCompactionConfig> compactionInfo = null;
@@ -1909,13 +1954,14 @@ public class TabletServer implements Runnable {
for (Tablet tablet : tabletsToCompact) {
// all for the same table id, so only need to read
// compaction id once
- if (compactionInfo == null)
+ if (compactionInfo == null) {
try {
compactionInfo = tablet.getCompactionID();
} catch (NoNodeException e) {
log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
return;
}
+ }
tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
}
@@ -2000,7 +2046,7 @@ public class TabletServer implements Runnable {
NamespaceId namespaceId;
TableId tableId = TableId.of(request.getTableId());
try {
- namespaceId = Tables.getNamespaceId(context, tableId);
+ namespaceId = Tables.getNamespaceId(getContext(), tableId);
} catch (TableNotFoundException e1) {
throw new ThriftTableOperationException(tableId.canonical(), null, null,
TableOperationExceptionType.NOTFOUND, null);
@@ -2011,10 +2057,10 @@ public class TabletServer implements Runnable {
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
- ServerConfigurationFactory factory = context.getServerConfFactory();
+ ServerConfigurationFactory factory = getContext().getServerConfFactory();
ExecutorService es = resourceManager.getSummaryPartitionExecutor();
- Future<SummaryCollection> future = new Gatherer(context, request,
- factory.getTableConfiguration(tableId), context.getCryptoService()).gather(es);
+ Future<SummaryCollection> future = new Gatherer(getContext(), request,
+ factory.getTableConfiguration(tableId), getContext().getCryptoService()).gather(es);
return startSummaryOperation(credentials, future);
}
@@ -2029,11 +2075,11 @@ public class TabletServer implements Runnable {
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
- ServerConfigurationFactory factory = context.getServerConfFactory();
+ ServerConfigurationFactory factory = getContext().getServerConfFactory();
ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
- Future<SummaryCollection> future = new Gatherer(context, request,
+ Future<SummaryCollection> future = new Gatherer(getContext(), request,
factory.getTableConfiguration(TableId.of(request.getTableId())),
- context.getCryptoService()).processPartition(spe, modulus, remainder);
+ getContext().getCryptoService()).processPartition(spe, modulus, remainder);
return startSummaryOperation(credentials, future);
}
@@ -2056,8 +2102,8 @@ public class TabletServer implements Runnable {
Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
Future<SummaryCollection> future =
- new Gatherer(context, request, tableCfg, context.getCryptoService()).processFiles(volMgr,
- files, summaryCache, indexCache, fileLenCache, srp);
+ new Gatherer(getContext(), request, tableCfg, getContext().getCryptoService())
+ .processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
return startSummaryOperation(credentials, future);
}
@@ -2303,10 +2349,10 @@ public class TabletServer implements Runnable {
|| (extent.isMeta()
&& !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
log.debug("Unassigning {}", tls);
- TabletStateStore.unassign(context, tls, null);
+ TabletStateStore.unassign(getContext(), tls, null);
} else {
log.debug("Suspending " + tls);
- TabletStateStore.suspend(context, tls, null,
+ TabletStateStore.suspend(getContext(), tls, null,
requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
}
} catch (DistributedStoreException ex) {
@@ -2355,8 +2401,9 @@ public class TabletServer implements Runnable {
Set<KeyExtent> onlineOverlapping =
KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
- if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent))
+ if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent)) {
return;
+ }
if (!unopenedOverlapping.contains(extent)) {
log.info("assignment {} no longer in the unopened set", extent);
@@ -2383,7 +2430,7 @@ public class TabletServer implements Runnable {
SortedMap<Key,Value> tabletsKeyValues = new TreeMap<>();
try {
Pair<Text,KeyExtent> pair =
- verifyTabletInformation(context, extent, TabletServer.this.getTabletSession(),
+ verifyTabletInformation(getContext(), extent, TabletServer.this.getTabletSession(),
tabletsKeyValues, getClientAddressString(), getLock());
if (pair != null) {
locationToOpen = pair.getFirst();
@@ -2436,7 +2483,7 @@ public class TabletServer implements Runnable {
resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent));
TabletData data;
if (extent.isRootTablet()) {
- data = new TabletData(context, fs, getTableConfiguration(extent));
+ data = new TabletData(getContext(), fs, getTableConfiguration(extent));
} else {
data = new TabletData(extent, fs, tabletsKeyValues.entrySet().iterator());
}
@@ -2460,7 +2507,7 @@ public class TabletServer implements Runnable {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
Assignment assignment = new Assignment(extent, getTabletSession());
- TabletStateStore.setLocation(context, assignment);
+ TabletStateStore.setLocation(getContext(), assignment);
synchronized (openingTablets) {
synchronized (onlineTablets) {
@@ -2480,7 +2527,7 @@ public class TabletServer implements Runnable {
}
TableId tableId = extent.getTableId();
- ProblemReports.getInstance(context).report(new ProblemReport(tableId, TABLET_LOAD,
+ ProblemReports.getInstance(getContext()).report(new ProblemReport(tableId, TABLET_LOAD,
extent.getUUID().toString(), getClientAddressString(), e));
} finally {
releaseRecoveryMemory(extent);
@@ -2538,7 +2585,7 @@ public class TabletServer implements Runnable {
TProcessor processor, String threadName) throws UnknownHostException {
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
- ServerAddress sp = TServerUtils.startServer(context, address, portHint, processor,
+ ServerAddress sp = TServerUtils.startServer(getContext(), address, portHint, processor,
this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, maxMessageSizeProperty);
this.server = sp.server;
@@ -2547,9 +2594,10 @@ public class TabletServer implements Runnable {
private HostAndPort getMasterAddress() {
try {
- List<String> locations = context.getMasterLocations();
- if (locations.size() == 0)
+ List<String> locations = getContext().getMasterLocations();
+ if (locations.size() == 0) {
return null;
+ }
return HostAndPort.fromString(locations.get(0));
} catch (Exception e) {
log.warn("Failed to obtain master host " + e);
@@ -2565,7 +2613,7 @@ public class TabletServer implements Runnable {
return null;
}
// log.info("Listener API to master has been opened");
- return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, context);
+ return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, getContext());
} catch (Exception e) {
log.warn("Issue with masterConnection (" + address + ") " + e, e);
}
@@ -2581,15 +2629,15 @@ public class TabletServer implements Runnable {
clientHandler = new ThriftClientHandler();
Iface rpcProxy = TraceUtil.wrapService(clientHandler);
final Processor<Iface> processor;
- if (context.getThriftServerType() == ThriftServerType.SASL) {
+ if (getContext().getThriftServerType() == ThriftServerType.SASL) {
Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class,
getConfiguration());
processor = new Processor<>(tcredProxy);
} else {
processor = new Processor<>(rpcProxy);
}
- HostAndPort address = startServer(context.getServerConfFactory().getSystemConfiguration(),
- clientAddress.getHost(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
+ HostAndPort address = startServer(getConfiguration(), clientAddress.getHost(),
+ Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
log.info("address = {}", address);
return address;
}
@@ -2601,10 +2649,10 @@ public class TabletServer implements Runnable {
TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration());
ReplicationServicer.Processor<ReplicationServicer.Iface> processor =
new ReplicationServicer.Processor<>(repl);
- AccumuloConfiguration conf = context.getServerConfFactory().getSystemConfiguration();
- Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
- ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
- ServerAddress sp = TServerUtils.startServer(context, clientAddress.getHost(),
+ Property maxMessageSizeProperty =
+ getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null
+ ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE;
+ ServerAddress sp = TServerUtils.startServer(getContext(), clientAddress.getHost(),
Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler",
"Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS,
Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
@@ -2615,8 +2663,8 @@ public class TabletServer implements Runnable {
// The replication service is unique to the thrift service for a tserver, not just a host.
// Advertise the host and port for replication service given the host and port for the
// tserver.
- context.getZooReaderWriter().putPersistentData(
- context.getZooKeeperRoot() + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress,
+ getContext().getZooReaderWriter().putPersistentData(
+ getContext().getZooKeeperRoot() + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress,
sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
log.error("Could not advertise replication service port", e);
@@ -2629,10 +2677,10 @@ public class TabletServer implements Runnable {
}
private void announceExistence() {
- ZooReaderWriter zoo = context.getZooReaderWriter();
+ ZooReaderWriter zoo = getContext().getZooReaderWriter();
try {
String zPath =
- context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + getClientAddressString();
+ getContext().getZooKeeperRoot() + Constants.ZTSERVERS + "/" + getClientAddressString();
try {
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
@@ -2651,8 +2699,9 @@ public class TabletServer implements Runnable {
@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(serverStopRequested ? 0 : 1, () -> {
- if (!serverStopRequested)
+ if (!serverStopRequested) {
log.error("Lost tablet server lock (reason = {}), exiting.", reason);
+ }
gcLogger.logGCInfo(getConfiguration());
});
}
@@ -2672,7 +2721,7 @@ public class TabletServer implements Runnable {
if (tabletServerLock.tryLock(lw, lockContent)) {
log.debug("Obtained tablet server lock {}", tabletServerLock.getLockPath());
lockID = tabletServerLock.getLockID()
- .serialize(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/");
+ .serialize(getContext().getZooKeeperRoot() + Constants.ZTSERVERS + "/");
return;
}
log.info("Waiting for tablet server lock");
@@ -2690,13 +2739,13 @@ public class TabletServer implements Runnable {
// main loop listens for client requests
@Override
public void run() {
- SecurityUtil.serverLogin(context.getConfiguration());
+ SecurityUtil.serverLogin(getConfiguration());
// To make things easier on users/devs, and to avoid creating an upgrade path to 1.7
// We can just make the zookeeper paths before we try to use.
try {
- ZooKeeperInitialization.ensureZooKeeperInitialized(context.getZooReaderWriter(),
- context.getZooKeeperRoot());
+ ZooKeeperInitialization.ensureZooKeeperInitialized(getContext().getZooReaderWriter(),
+ getContext().getZooKeeperRoot());
} catch (KeeperException | InterruptedException e) {
log.error("Could not ensure that ZooKeeper is properly initialized", e);
throw new RuntimeException(e);
@@ -2745,9 +2794,10 @@ public class TabletServer implements Runnable {
getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
bulkFailedCopyQ = new DistributedWorkQueue(
- context.getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
+ getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
try {
- bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(context), distWorkQThreadPool);
+ bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()),
+ distWorkQThreadPool);
} catch (Exception e1) {
throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
}
@@ -2800,7 +2850,7 @@ public class TabletServer implements Runnable {
&& client.getOutputProtocol().getTransport() != null
&& client.getOutputProtocol().getTransport().isOpen()) {
try {
- mm.send(context.rpcCreds(), getClientAddressString(), iface);
+ mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
mm = null;
} catch (TException ex) {
log.warn("Error sending message: queuing message again");
@@ -2932,8 +2982,9 @@ public class TabletServer implements Runnable {
return verifyRootTablet(context, instance);
}
TableId tableToVerify = MetadataTable.ID;
- if (extent.isMeta())
+ if (extent.isMeta()) {
tableToVerify = RootTable.ID;
+ }
List<ColumnFQ> columnsToFetch =
Arrays.asList(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN,
@@ -2945,8 +2996,9 @@ public class TabletServer implements Runnable {
TreeMap<Key,Value> tkv = new TreeMap<>();
try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) {
scanner.setRange(extent.toMetadataRange());
- for (Entry<Key,Value> entry : scanner)
+ for (Entry<Key,Value> entry : scanner) {
tkv.put(entry.getKey(), entry.getValue());
+ }
}
// only populate map after success
@@ -2956,8 +3008,9 @@ public class TabletServer implements Runnable {
Text metadataEntry = extent.getMetadataEntry();
Value dir = checkTabletMetadata(extent, instance, tabletsKeyValues, metadataEntry);
- if (dir == null)
+ if (dir == null) {
return null;
+ }
Value oldPrevEndRow = null;
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -3049,15 +3102,17 @@ public class TabletServer implements Runnable {
}
public String getClientAddressString() {
- if (clientAddress == null)
+ if (clientAddress == null) {
return null;
+ }
return clientAddress.getHost() + ":" + clientAddress.getPort();
}
public TServerInstance getTabletSession() {
String address = getClientAddressString();
- if (address == null)
+ if (address == null) {
return null;
+ }
try {
return new TServerInstance(address, tabletServerLock.getSessionId());
@@ -3068,13 +3123,13 @@ public class TabletServer implements Runnable {
}
private void config() {
- log.info("Tablet server starting on {}", context.getHostname());
+ log.info("Tablet server starting on {}", getHostname());
majorCompactorThread =
new Daemon(new LoggingRunnable(log, new MajorCompactor(getConfiguration())));
majorCompactorThread.setName("Split/MajC initiator");
majorCompactorThread.start();
- clientAddress = HostAndPort.fromParts(context.getHostname(), 0);
+ clientAddress = HostAndPort.fromParts(getHostname(), 0);
try {
AccumuloVFSClassLoader.getContextManager()
.setContextConfig(new ContextManager.DefaultContextsConfig() {
@@ -3090,7 +3145,7 @@ public class TabletServer implements Runnable {
// A task that cleans up unused classloader contexts
Runnable contextCleaner = () -> {
- Set<String> contextProperties = context.getServerConfFactory().getSystemConfiguration()
+ Set<String> contextProperties = getConfiguration()
.getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY).keySet();
Set<String> configuredContexts = new HashSet<>();
for (String prop : contextProperties) {
@@ -3149,14 +3204,18 @@ public class TabletServer implements Runnable {
table.scanRate += tablet.scanRate();
long recsInMemory = tablet.getNumEntriesInMemory();
table.recsInMemory += recsInMemory;
- if (tablet.isMinorCompactionRunning())
+ if (tablet.isMinorCompactionRunning()) {
table.minors.running++;
- if (tablet.isMinorCompactionQueued())
+ }
+ if (tablet.isMinorCompactionQueued()) {
table.minors.queued++;
- if (tablet.isMajorCompactionRunning())
+ }
+ if (tablet.isMajorCompactionRunning()) {
table.majors.running++;
- if (tablet.isMajorCompactionQueued())
+ }
+ if (tablet.isMajorCompactionQueued()) {
table.majors.queued++;
+ }
});
scanCounts.forEach((tableId, mapCounter) -> {
@@ -3166,8 +3225,9 @@ public class TabletServer implements Runnable {
tables.put(tableId.canonical(), table);
}
- if (table.scans == null)
+ if (table.scans == null) {
table.scans = new Compacting();
+ }
table.scans.queued += mapCounter.getInt(ScanRunState.QUEUED);
table.scans.running += mapCounter.getInt(ScanRunState.RUNNING);
@@ -3212,29 +3272,6 @@ public class TabletServer implements Runnable {
return result;
}
- public static void main(String[] args) throws Exception {
- final String app = "tserver";
- ServerOpts opts = new ServerOpts();
- opts.parseArgs(app, args);
- ServerContext context = new ServerContext(opts.getSiteConfiguration());
- context.setupServer(app, TabletServer.class.getSimpleName(), opts.getAddress());
- context.setupCrypto();
- try {
- final TabletServer server = new TabletServer(context);
- if (UserGroupInformation.isSecurityEnabled()) {
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- loginUser.doAs((PrivilegedExceptionAction<Void>) () -> {
- server.run();
- return null;
- });
- } else {
- server.run();
- }
- } finally {
- context.teardownServer();
- }
- }
-
private Durability getMincEventDurability(KeyExtent extent) {
TableConfiguration conf;
if (extent.isMeta()) {
@@ -3271,9 +3308,10 @@ public class TabletServer implements Runnable {
if (fs.exists(finished)) {
recovery = finished.getParent();
}
- if (recovery == null)
+ if (recovery == null) {
throw new IOException(
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
+ }
recoveryLogs.add(recovery);
}
logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);