You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/07/10 22:02:33 UTC
[accumulo] 01/02: ACCUMULO-4074 Added support for multiple scan
executors (#549)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit aaa757e4372a22b17201c1bee96b243b620345d3
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jul 3 17:08:40 2018 -0400
ACCUMULO-4074 Added support for multiple scan executors (#549)
* Created plugin API for prioritizing scans that avoids accessing internal
types. This should make user written comparators more stable over time.
* Made it possible to configure a comparator for each scan executor. Was
previously a single comparator type for all executors.
* Made it possible to pass options for comparator creation
* Made it possible to configure a per table dispatcher. This dispatcher
chooses which scan executor to use. Dispatcher are user written plugins.
* Fixed comparator type casting. I don't think the comparators in #510 worked
because the Runnables were never the type expected. Also, abstracted all of
this away from a user who may write a comparator.
---
core/pom.xml | 19 ++
.../accumulo/core/conf/AccumuloConfiguration.java | 136 +++++++++
.../accumulo/core/conf/DefaultConfiguration.java | 5 +
.../org/apache/accumulo/core/conf/Property.java | 55 +++-
.../accumulo/core/conf/SiteConfiguration.java | 10 +
.../core/spi/common/IteratorConfiguration.java | 30 +-
.../common/Stats.java} | 36 ++-
.../core/spi/scan/IdleRatioScanPrioritizer.java | 51 ++++
.../accumulo/core/spi/scan/ScanDispatcher.java | 52 ++++
.../accumulo/core/spi/scan/ScanExecutor.java | 60 ++++
.../apache/accumulo/core/spi/scan/ScanInfo.java | 116 ++++++++
.../accumulo/core/spi/scan/ScanPrioritizer.java | 22 +-
.../core/spi/scan/SimpleScanDispatcher.java | 74 +++++
.../util/AccumuloUncaughtExceptionHandler.java | 1 -
.../accumulo/core/util/NamingThreadFactory.java | 16 +-
.../java/org/apache/accumulo/core/util/Stat.java | 75 ++---
.../core/conf/AccumuloConfigurationTest.java | 112 +++++++-
.../spi/scan/IdleRatioScanPrioritizerTest.java | 61 +++++
.../core/spi/scan/SimpleScanDispatcherTest.java | 62 +++++
.../accumulo/core/spi/scan/TestScanInfo.java | 101 +++++++
.../org/apache/accumulo/core/util/StatTest.java | 40 ++-
.../accumulo/server/conf/TableConfiguration.java | 37 +++
.../accumulo/server/conf/ZooConfiguration.java | 13 +-
.../org/apache/accumulo/tserver/TabletServer.java | 41 ++-
.../tserver/TabletServerResourceManager.java | 304 +++++++++++----------
.../accumulo/tserver/scan/NextBatchTask.java | 7 +-
.../tserver/session/DefaultSessionComparator.java | 67 -----
.../accumulo/tserver/session/MultiScanSession.java | 32 +--
.../accumulo/tserver/session/ScanSession.java | 162 ++++++++---
.../apache/accumulo/tserver/session/Session.java | 18 +-
.../accumulo/tserver/session/SessionManager.java | 11 +-
.../{ScanSession.java => SingleScanSession.java} | 29 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 3 +-
.../tserver/session/SessionComparatorTest.java | 170 ------------
.../org/apache/accumulo/test/IMMLGBenchmark.java | 2 +-
.../test/performance/scan/CollectTabletStats.java | 5 +-
36 files changed, 1411 insertions(+), 624 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index cf45283..4535f50 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -212,6 +212,25 @@
</allows>
</configuration>
</execution>
+ <execution>
+ <id>apilyzer-spi</id>
+ <goals>
+ <goal>analyze</goal>
+ </goals>
+ <configuration>
+ <outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile>
+ <includes>
+ <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include>
+ </includes>
+ <excludes />
+ <allows>
+ <allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
+ <allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+ <allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+ <allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow>
+ </allows>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 5a9ce21..5427d79 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -16,12 +16,17 @@
*/
package org.apache.accumulo.core.conf;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -29,10 +34,12 @@ import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
@@ -348,6 +355,135 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str
return maxFilesPerTablet;
}
+ public class ScanExecutorConfig {
+ public final String name;
+ public final int maxThreads;
+ public final OptionalInt priority;
+ public final Optional<String> prioritizerClass;
+ public final Map<String,String> prioritizerOpts;
+
+ public ScanExecutorConfig(String name, int maxThreads, OptionalInt priority,
+ Optional<String> comparatorFactory, Map<String,String> comparatorFactoryOpts) {
+ this.name = name;
+ this.maxThreads = maxThreads;
+ this.priority = priority;
+ this.prioritizerClass = comparatorFactory;
+ this.prioritizerOpts = comparatorFactoryOpts;
+ }
+
+ /**
+ * Re-reads the max threads from the configuration that created this class
+ */
+ public int getCurrentMaxThreads() {
+ Integer depThreads = getDeprecatedScanThreads(name);
+ if (depThreads != null) {
+ return depThreads;
+ }
+
+ String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "." + SCAN_EXEC_THREADS;
+ String val = getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop);
+ return Integer.parseInt(val);
+ }
+ }
+
+ public boolean isPropertySet(Property prop) {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("deprecation")
+ Integer getDeprecatedScanThreads(String name) {
+
+ Property prop;
+ Property deprecatedProp;
+
+ if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) {
+ prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
+ deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT;
+ } else if (name.equals("meta")) {
+ prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS;
+ deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT;
+ } else {
+ return null;
+ }
+
+ if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+ log.warn("Property {} is deprecated, use {} instead.", prop.getKey(),
+ deprecatedProp.getKey());
+ return Integer.valueOf(get(deprecatedProp));
+ } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+ log.warn("Deprecated property {} ignored because {} is set", deprecatedProp.getKey(),
+ prop.getKey());
+ }
+
+ return null;
+ }
+
+ private static final String SCAN_EXEC_THREADS = "threads";
+ private static final String SCAN_EXEC_PRIORITY = "priority";
+ private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
+ private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts.";
+
+ public Collection<ScanExecutorConfig> getScanExecutors() {
+
+ Map<String,Map<String,String>> propsByName = new HashMap<>();
+
+ List<ScanExecutorConfig> scanResources = new ArrayList<>();
+
+ for (Entry<String,String> entry : getAllPropertiesWithPrefix(
+ Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) {
+
+ String suffix = entry.getKey()
+ .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length());
+ String[] tokens = suffix.split("\\.", 2);
+ String name = tokens[0];
+
+ propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1], entry.getValue());
+ }
+
+ for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) {
+ String name = entry.getKey();
+ Integer threads = null;
+ Integer prio = null;
+ String prioritizerClass = null;
+ Map<String,String> prioritizerOpts = new HashMap<>();
+
+ for (Entry<String,String> subEntry : entry.getValue().entrySet()) {
+ String opt = subEntry.getKey();
+ String val = subEntry.getValue();
+
+ if (opt.equals(SCAN_EXEC_THREADS)) {
+ Integer depThreads = getDeprecatedScanThreads(name);
+ if (depThreads == null) {
+ threads = Integer.parseInt(val);
+ } else {
+ threads = depThreads;
+ }
+ } else if (opt.equals(SCAN_EXEC_PRIORITY)) {
+ prio = Integer.parseInt(val);
+ } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) {
+ prioritizerClass = val;
+ } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) {
+ String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length());
+ if (key.isEmpty()) {
+ throw new IllegalStateException("Invalid scan executor option : " + opt);
+ }
+ prioritizerOpts.put(key, val);
+ } else {
+ throw new IllegalStateException("Unkown scan executor option : " + opt);
+ }
+ }
+
+ Preconditions.checkArgument(threads != null && threads > 0,
+ "Scan resource %s incorrectly specified threads", name);
+
+ scanResources.add(new ScanExecutorConfig(name, threads,
+ prio == null ? OptionalInt.empty() : OptionalInt.of(prio),
+ Optional.ofNullable(prioritizerClass), prioritizerOpts));
+ }
+
+ return scanResources;
+ }
+
/**
* Invalidates the <code>ZooCache</code> used for storage and quick retrieval of properties for
* this configuration.
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
index fc2891e..54b87cc 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
@@ -52,4 +52,9 @@ public class DefaultConfiguration extends AccumuloConfiguration {
resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey()))
.forEach(e -> props.put(e.getKey(), e.getValue()));
}
+
+ @Override
+ public boolean isPropertySet(Property prop) {
+ return false;
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 98402ef..b7ff210 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -30,6 +30,9 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -430,14 +433,37 @@ public enum Property {
"When a tablet server's SimpleTimer thread triggers to check idle"
+ " sessions, this configurable option will be used to evaluate update"
+ " sessions to determine if they can be closed due to inactivity"),
+ @Deprecated
TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
- "The maximum number of concurrent read ahead that will execute. This effectively"
- + " limits the number of long running scans that can run concurrently per tserver."),
- TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, PropertyType.PREFIX,
- "Properties in this category allow overriding of table specific read ahead pools"),
+ "This property is deprecated since 2.0.0, use tserver.scan.executors.default.threads "
+ + "instead. The maximum number of concurrent read ahead that will execute. This "
+ + "effectively limits the number of long running scans that can run concurrently "
+ + "per tserver.\""),
+ @Deprecated
TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max", "8",
PropertyType.COUNT,
- "The maximum number of concurrent metadata read ahead that will execute."),
+ "This property is deprecated since 2.0.0, use tserver.scan.executors.meta.threads instead. "
+ + "The maximum number of concurrent metadata read ahead that will execute."),
+ TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null, PropertyType.PREFIX,
+ "Prefix for defining executors to service scans. For each executor the number of threads, "
+ + "thread priority, and an optional prioritizer can be configured. The prioritizer "
+ + "determines which scan an executor should run first and must implement "
+ + ScanPrioritizer.class.getName() + ". Tables can select an executor by setting"
+ + " table.scan.dispatcher. To configure a new executor, set "
+ + "tserver.scan.executors.<name>.threads=<number>. Optionally, can also set "
+ + "tserver.scan.executors.<name>.priority=<number 1 to 10>, "
+ + "tserver.scan.executors.<name>.prioritizer=<class name>, and "
+ + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"),
+ TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads", "16",
+ PropertyType.COUNT,
+ "The number of threads for the scan executor that tables use by default."),
+ TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer", "",
+ PropertyType.STRING,
+ "Prioritizer for the default scan executor. Defaults to none which "
+ + "results in FIFO priority. Set to a class that implements "
+ + ScanPrioritizer.class.getName() + " to configure one."),
+ TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads", "8", PropertyType.COUNT,
+ "The number of threads for the metadata table scan executor."),
TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", PropertyType.COUNT,
"The maximum number of concurrent tablet migrations for a tablet server"),
TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
@@ -541,9 +567,6 @@ public enum Property {
TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT,
"The number of threads on each tablet server available to retrieve"
+ " summary data, that is not currently in cache, from RFiles."),
- TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME,
- "A customizable Scan session comparator. Note that by default, the value is empty"
- + " and thus uses no session comparator"),
// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX,
@@ -669,6 +692,19 @@ public enum Property {
PropertyType.BYTES,
"The max RFile size used for a merging minor compaction. The default"
+ " value of 0 disables a max file size."),
+ TABLE_SCAN_DISPATCHER("table.scan.dispatcher", SimpleScanDispatcher.class.getName(),
+ PropertyType.CLASSNAME,
+ "This class is used to dynamically dispatch scans to configured scan executors. This setting"
+ + " defaults to " + SimpleScanDispatcher.class.getSimpleName()
+ + " which dispatches to an executor"
+ + " named 'default' when it is optionless. Setting the option "
+ + "'table.scan.dispatcher.opts.executor=<name>' causes "
+ + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the specified executor. "
+ + "It has more options listed in its javadoc. Configured classes must implement "
+ + ScanDispatcher.class.getName() + ". This property is ignored for the root and metadata"
+ + " table. The metadata table always dispatches to a scan executor named `meta`."),
+ TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null, PropertyType.PREFIX,
+ "Options for the table scan dispatcher"),
TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES,
"The maximum amount of memory that will be used to cache results of a client query/scan. "
+ "Once this limit is reached, the buffered data is sent to the client."),
@@ -1153,7 +1189,8 @@ public enum Property {
|| key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
|| key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey())
|| key.startsWith(TABLE_SAMPLER_OPTS.getKey())
- || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())));
+ || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())
+ || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey())));
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
index 001b1db..cf0fe25 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* An {@link AccumuloConfiguration} which first loads any properties set on the command-line (using
* the -o option) and then from an XML file, usually accumulo-site.xml. This implementation supports
@@ -165,6 +167,14 @@ public class SiteConfiguration extends AccumuloConfiguration {
}
@Override
+ public boolean isPropertySet(Property prop) {
+ Preconditions.checkArgument(!prop.isSensitive(),
+ "This method not implemented for sensitive props");
+ return CliConfiguration.get(prop) != null || staticConfigs.containsKey(prop.getKey())
+ || getXmlConfig().get(prop.getKey()) != null || parent.isPropertySet(prop);
+ }
+
+ @Override
public void getProperties(Map<String,String> props, Predicate<String> filter) {
getProperties(props, filter, true);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
similarity index 58%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
index 28e6ef2..40f05e5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
@@ -14,23 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.common;
-public class SingleRangePriorityComparator extends DefaultSessionComparator {
+import java.util.Map;
- @Override
- public int compareSession(Session sessionA, Session sessionB) {
- int priority = super.compareSession(sessionA, sessionB);
+/**
+ * Provides information about a configured Accumulo Iterator
+ *
+ * @since 2.0.0
+ */
+public interface IteratorConfiguration {
+ String getIteratorClass();
+
+ String getName();
+
+ int getPriority();
- if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession) {
- if (priority < 0) {
- priority *= -1;
- }
- } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession) {
- if (priority > 0) {
- priority *= -1;
- }
- }
- return priority;
- }
+ Map<String,String> getOptions();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
similarity index 58%
copy from core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
copy to core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
index f688010..a1b9322 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
@@ -14,21 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.accumulo.core.util;
+package org.apache.accumulo.core.spi.common;
-import java.lang.Thread.UncaughtExceptionHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+/**
+ * @since 2.0.0
+ */
+public interface Stats {
-public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
+ /**
+ * @return the minimum data point seen, or 0 if no data was seen
+ */
+ long min();
- private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+ /**
+ * @return the maximum data point seen, or 0 if no data was seen
+ */
+ long max();
- @Override
- public void uncaughtException(Thread t, Throwable e) {
+ /**
+ * @return the mean of the data points seen, or {@link Double#NaN} if no data was seen
+ */
+ double mean();
- log.error(String.format("Caught an exception in %s. Shutting down.", t), e);
- }
+ /**
+ * @return the sum of the data points seen, or 0 if no data was seen
+ */
+ long sum();
+ /**
+ * @return the number of data points seen
+ */
+ long num();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
new file mode 100644
index 0000000..c567460
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower ratio have a higher
+ * priority. When the ratio is equal, the scan with the oldest last run time has the highest
+ * priority. If neither have run, then the oldest gets priority.
+ *
+ * @since 2.0.0
+ */
+public class IdleRatioScanPrioritizer implements ScanPrioritizer {
+ private static double idleRatio(long currTime, ScanInfo si) {
+ double totalRunTime = si.getRunTimeStats().sum();
+ double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum());
+ return totalRunTime / totalIdleTime;
+ }
+
+ @Override
+ public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+ Preconditions.checkArgument(options.isEmpty());
+
+ Comparator<ScanInfo> c1 = (si1, si2) -> {
+ long currTime = System.currentTimeMillis();
+ return Double.compare(idleRatio(currTime, si1), idleRatio(currTime, si2));
+ };
+
+ return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0))
+ .thenComparingLong(si -> si.getCreationTime());
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
new file mode 100644
index 0000000..a3bc3f1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A per table scan dispatcher that decides which executor should be used to processes a scan. For
+ * information about configuring, find the documentation for the {@code table.scan.dispatcher} and
+ * {@code table.scan.dispatcher.opts.} properties.
+ *
+ * @since 2.0.0
+ */
+public interface ScanDispatcher {
+ /**
+ * This method is called once after a ScanDispatcher is instantiated.
+ *
+ * @param options
+ * The configured options. For example if the table properties
+ * {@code table.scan.dispatcher.opts.p1=abc} and
+ * {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
+ * {@code p1=abc} and {@code p9=123}.
+ */
+ public default void init(Map<String,String> options) {
+ Preconditions.checkArgument(options.isEmpty(), "No options expected");
+ }
+
+ /**
+ * @param scanInfo
+ * Information about the scan.
+ * @param scanExecutors
+ * Information about the currently configured executors.
+ * @return Should return one of the executors named in scanExecutors.keySet()
+ */
+ String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
new file mode 100644
index 0000000..a55b090
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for obtaining information about a scan executor
+ *
+ * @since 2.0.0
+ */
+public interface ScanExecutor {
+
+ interface Config {
+ /**
+ * @return the unique name used to identified executor in config
+ */
+ String getName();
+
+ /**
+ * @return the max number of threads that were configured
+ */
+ int getMaxThreads();
+
+ /**
+ * @return the prioritizer that was configured
+ */
+ Optional<String> getPrioritizerClass();
+
+ /**
+ * @return the prioritizer options
+ */
+ Map<String,String> getPrioritizerOptions();
+ }
+
+ /**
+ * @return The number of task queued for the executor
+ */
+ int getQueued();
+
+ /**
+ * @return The configuration used to create the executor
+ */
+ Config getConfig();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
new file mode 100644
index 0000000..7961c21
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+
+/**
+ * Provides information about an active Accumulo scan against a tablet. Accumulo scans operate by
+ * repeatedly gathering batches of data and returning those to the client.
+ *
+ * <p>
+ * All times are in milliseconds and obtained using System.currentTimeMillis().
+ *
+ * @since 2.0.0
+ */
+public interface ScanInfo {
+
+ enum Type {
+ /**
+ * A single range scan started using a {@link Scanner}
+ */
+ SINGLE,
+ /**
+ * A multi range scan started using a {@link BatchScanner}
+ */
+ MULTI
+ }
+
+ Type getScanType();
+
+ String getTableId();
+
+ /**
+ * Returns the first time a tablet knew about a scan over its portion of data.
+ */
+ long getCreationTime();
+
+ /**
+ * If the scan has run, returns the last run time.
+ */
+ OptionalLong getLastRunTime();
+
+ /**
+ * Returns timing statistics about running and gathering a batches of data.
+ */
+ Stats getRunTimeStats();
+
+ /**
+ * Returns statistics about the time between running. These stats are only about the idle times
+ * before the last run time. The idle time after the last run time are not included. If the scan
+ * has never run, then there are no stats.
+ */
+ Stats getIdleTimeStats();
+
+ /**
+ * This method is similar to {@link #getIdleTimeStats()}, but it also includes the time period
+ * between the last run time and now in the stats. If the scan has never run, then the stats are
+ * computed using only {@code currentTime - creationTime}.
+ */
+ Stats getIdleTimeStats(long currentTime);
+
+ /**
+ * This method returns what column were fetched by a scan. When a family is fetched, a Column
+ * object where everything but the family is null is in the set.
+ *
+ * <p>
+ * The following example code shows how this method can be used to check if a family was fetched
+ * or a family+qualifier was fetched. If continually checking for the same column, should probably
+ * create a constant.
+ *
+ * <pre>
+ * <code>
+ * boolean wasFamilyFetched(ScanInfo si, byte[] fam) {
+ * Column family = new Column(fam, null, null);
+ * return si.getFetchedColumns().contains(family);
+ * }
+ *
+ * boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) {
+ * Column col = new Column(fam, qual, null);
+ * return si.getFetchedColumns().contains(col);
+ * }
+ * </code>
+ * </pre>
+ *
+ *
+ * @return The family and family+qualifier pairs fetched.
+ */
+ Set<Column> getFetchedColumns();
+
+ /**
+ * @return iterators that where configured on the client side scanner
+ */
+ Collection<IteratorConfiguration> getClientScanIterators();
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
similarity index 65%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
rename to core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
index dcfa1d4..51a4254 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -14,19 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.accumulo.tserver.session;
+package org.apache.accumulo.core.spi.scan;
import java.util.Comparator;
+import java.util.Map;
-public abstract class SessionComparator implements Comparator<Runnable> {
-
- @Override
- public int compare(Runnable sessionA, Runnable sessionB) {
- if (sessionA instanceof Session && sessionB instanceof Session)
- return compareSession((Session) sessionA, (Session) sessionB);
- else
- return 0;
- }
-
- public abstract int compareSession(final Session sessionA, final Session sessionB);
+/**
+ * A factory for creating comparators used for prioritizing scans. For information about
+ * configuring, find the documentation for the {@code tserver.scan.executors.} property.
+ *
+ * @since 2.0.0
+ */
+public interface ScanPrioritizer {
+ Comparator<ScanInfo> createComparator(Map<String,String> options);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
new file mode 100644
index 0000000..96e7d2c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * If no options are given, then this will dispatch to an executor named {@code default}. This
+ * dispatcher supports the following options.
+ *
+ * <UL>
+ * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : dispatches all scans to
+ * the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>} : dispatches batch
+ * scans to the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} : dispatches regular
+ * scans to the named executor.</LI>
+ * </UL>
+ *
+ * The {@code multi_executor} and {@code single_executor} options override the {@code executor}
+ * option.
+ */
+
+public class SimpleScanDispatcher implements ScanDispatcher {
+
+ private final Set<String> VALID_OPTS = ImmutableSet.of("executor", "multi_executor",
+ "single_executor");
+ private String multiExecutor;
+ private String singleExecutor;
+
+ public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
+
+ @Override
+ public void init(Map<String,String> options) {
+ Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
+ Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
+
+ String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
+ multiExecutor = options.getOrDefault("multi_executor", base);
+ singleExecutor = options.getOrDefault("single_executor", base);
+ }
+
+ @Override
+ public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) {
+ switch (scanInfo.getScanType()) {
+ case MULTI:
+ return multiExecutor;
+ case SINGLE:
+ return singleExecutor;
+ default:
+ throw new IllegalArgumentException("Unexpected scan type " + scanInfo.getScanType());
+
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
index f688010..ed9c150 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -27,7 +27,6 @@ public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandle
@Override
public void uncaughtException(Thread t, Throwable e) {
-
log.error(String.format("Caught an exception in %s. Shutting down.", t), e);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index 0b4730c..57d429b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.core.util;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,20 +28,30 @@ import org.slf4j.LoggerFactory;
public class NamingThreadFactory implements ThreadFactory {
private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class);
- private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new AccumuloUncaughtExceptionHandler();
+ private static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler();
private AtomicInteger threadNum = new AtomicInteger(1);
private String name;
+ private OptionalInt priority;
public NamingThreadFactory(String name) {
this.name = name;
+ this.priority = OptionalInt.empty();
+ }
+
+ public NamingThreadFactory(String name, OptionalInt priority) {
+ this.name = name;
+ this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Daemon(new LoggingRunnable(log, r),
name + " " + threadNum.getAndIncrement());
- thread.setUncaughtExceptionHandler(uncaughtHandler);
+ thread.setUncaughtExceptionHandler(UEH);
+ if (priority.isPresent()) {
+ thread.setPriority(priority.getAsInt());
+ }
return thread;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
index 8bdeb63..704622b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
@@ -16,67 +16,70 @@
*/
package org.apache.accumulo.core.util;
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.accumulo.core.spi.common.Stats;
import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
-public class Stat {
- Min min;
- Max max;
- Sum sum;
+public class Stat implements Stats {
+ long min;
+ long max;
+ long sum;
Mean mean;
- StandardDeviation sd;
-
- StorelessUnivariateStatistic[] stats;
public Stat() {
- min = new Min();
- max = new Max();
- sum = new Sum();
mean = new Mean();
- sd = new StandardDeviation();
-
- stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd};
+ clear();
}
public void addStat(long stat) {
- for (StorelessUnivariateStatistic statistic : stats) {
- statistic.increment(stat);
- }
+ min = Math.min(min, stat);
+ max = Math.max(max, stat);
+ sum += stat;
+ mean.increment(stat);
}
- public long getMin() {
- return (long) min.getResult();
+ @Override
+ public long min() {
+ return num() == 0 ? 0L : min;
}
- public long getMax() {
- return (long) max.getResult();
+ @Override
+ public long max() {
+ return num() == 0 ? 0L : max;
}
- public long getSum() {
- return (long) sum.getResult();
+ @Override
+ public long sum() {
+ return sum;
}
- public double getAverage() {
+ @Override
+ public double mean() {
return mean.getResult();
}
- public double getStdDev() {
- return sd.getResult();
- }
-
@Override
public String toString() {
- return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), getAverage(), mean.getN());
+ return String.format("%,d %,d %,.2f %,d", min(), max(), mean(), mean.getN());
}
public void clear() {
- for (StorelessUnivariateStatistic statistic : stats) {
- statistic.clear();
- }
+ min = Long.MAX_VALUE;
+ max = Long.MIN_VALUE;
+ sum = 0;
+ mean.clear();
+ }
+
+ @Override
+ public long num() {
+ return mean.getN();
}
+ public Stat copy() {
+ Stat stat = new Stat();
+ stat.min = this.min;
+ stat.max = this.max;
+ stat.sum = this.sum;
+ stat.mean = this.mean.copy();
+ return stat;
+ }
}
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
index 9f77834..0fcfebd 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
@@ -21,11 +21,15 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -131,6 +135,15 @@ public class AccumuloConfigurationTest {
private HashMap<String,String> props = new HashMap<>();
private int upCount = 0;
+ private AccumuloConfiguration parent;
+
+ TestConfiguration() {
+ parent = null;
+ }
+
+ TestConfiguration(AccumuloConfiguration parent) {
+ this.parent = parent;
+ }
public void set(String p, String v) {
props.put(p, v);
@@ -138,17 +151,29 @@ public class AccumuloConfigurationTest {
}
@Override
+ public boolean isPropertySet(Property prop) {
+ return props.containsKey(prop.getKey());
+ }
+
+ @Override
public long getUpdateCount() {
return upCount;
}
@Override
public String get(Property property) {
- return props.get(property.getKey());
+ String v = props.get(property.getKey());
+ if (v == null & parent != null) {
+ v = parent.get(property);
+ }
+ return v;
}
@Override
public void getProperties(Map<String,String> output, Predicate<String> filter) {
+ if (parent != null) {
+ parent.getProperties(output, filter);
+ }
for (Entry<String,String> entry : props.entrySet()) {
if (filter.test(entry.getKey())) {
output.put(entry.getKey(), entry.getValue());
@@ -267,4 +292,89 @@ public class AccumuloConfigurationTest {
Map<String,String> pmL = tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX);
assertSame(pmG, pmL);
}
+
+ @Test
+ public void testScanExecutors() {
+ String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+ TestConfiguration tc = new TestConfiguration(DefaultConfiguration.getInstance());
+
+ Collection<ScanExecutorConfig> executors = tc.getScanExecutors();
+
+ Assert.assertEquals(2, executors.size());
+
+ ScanExecutorConfig sec = executors.stream().filter(c -> c.name.equals(defName)).findFirst()
+ .get();
+ Assert.assertEquals(
+ Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ Assert.assertFalse(sec.priority.isPresent());
+ Assert.assertTrue(sec.prioritizerClass.get().isEmpty());
+ Assert.assertTrue(sec.prioritizerOpts.isEmpty());
+
+ // ensure deprecated props is read if nothing else is set
+ tc.set("tserver.readahead.concurrent.max", "6");
+ Assert.assertEquals(6, sec.getCurrentMaxThreads());
+ Assert.assertEquals(
+ Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+ .findFirst().get();
+ Assert.assertEquals(6, sec2.maxThreads);
+
+ // ensure new prop overrides deperecated prop
+ tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9");
+ Assert.assertEquals(9, sec.getCurrentMaxThreads());
+ Assert.assertEquals(
+ Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c -> c.name.equals(defName))
+ .findFirst().get();
+ Assert.assertEquals(9, sec3.maxThreads);
+
+ ScanExecutorConfig sec4 = executors.stream().filter(c -> c.name.equals("meta")).findFirst()
+ .get();
+ Assert.assertEquals(
+ Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()),
+ sec4.maxThreads);
+ Assert.assertFalse(sec4.priority.isPresent());
+ Assert.assertFalse(sec4.prioritizerClass.isPresent());
+ Assert.assertTrue(sec4.prioritizerOpts.isEmpty());
+
+ tc.set("tserver.metadata.readahead.concurrent.max", "2");
+ Assert.assertEquals(2, sec4.getCurrentMaxThreads());
+ ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+ .findFirst().get();
+ Assert.assertEquals(2, sec5.maxThreads);
+
+ tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3");
+ Assert.assertEquals(3, sec4.getCurrentMaxThreads());
+ ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c -> c.name.equals("meta"))
+ .findFirst().get();
+ Assert.assertEquals(3, sec6.maxThreads);
+
+ String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+ tc.set(prefix + "hulksmash.threads", "66");
+ tc.set(prefix + "hulksmash.priority", "3");
+ tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer");
+ tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1");
+ tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3");
+
+ executors = tc.getScanExecutors();
+ Assert.assertEquals(3, executors.size());
+ ScanExecutorConfig sec7 = executors.stream().filter(c -> c.name.equals("hulksmash")).findFirst()
+ .get();
+ Assert.assertEquals(66, sec7.maxThreads);
+ Assert.assertEquals(3, sec7.priority.getAsInt());
+ Assert.assertEquals("com.foo.ScanPrioritizer", sec7.prioritizerClass.get());
+ Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"), sec7.prioritizerOpts);
+
+ tc.set(prefix + "hulksmash.threads", "44");
+ Assert.assertEquals(66, sec7.maxThreads);
+ Assert.assertEquals(44, sec7.getCurrentMaxThreads());
+
+ ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c -> c.name.equals("hulksmash"))
+ .findFirst().get();
+ Assert.assertEquals(44, sec8.maxThreads);
+ }
}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
new file mode 100644
index 0000000..0128b20
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IdleRatioScanPrioritizerTest {
+
+ @Test
+ public void testSort() {
+ long now = System.currentTimeMillis();
+
+ List<TestScanInfo> scans = new ArrayList<>();
+
+ // Two following have never run, so oldest should go first
+ scans.add(new TestScanInfo("a", Type.SINGLE, now - 3));
+ scans.add(new TestScanInfo("b", Type.SINGLE, now - 8));
+ // Two following have different idle ratio and same last run times
+ scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10));
+ scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10));
+ // Two following have same idle ratio and different last run times
+ scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9));
+ scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7));
+
+ Collections.shuffle(scans);
+
+ Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
+ .createComparator(Collections.emptyMap());
+
+ Collections.sort(scans, comparator);
+
+ Assert.assertEquals("b", scans.get(0).testId);
+ Assert.assertEquals("a", scans.get(1).testId);
+ Assert.assertEquals("f", scans.get(2).testId);
+ Assert.assertEquals("e", scans.get(3).testId);
+ Assert.assertEquals("d", scans.get(4).testId);
+ Assert.assertEquals("c", scans.get(5).testId);
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
new file mode 100644
index 0000000..b59858a
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleScanDispatcherTest {
+ @Test
+ public void testProps() {
+ Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey()
+ .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".threads"));
+ Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey()
+ .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
+ }
+
+ private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
+ ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
+ ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
+
+ SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
+ ssd1.init(opts);
+ Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null));
+ Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null));
+ }
+
+ @Test
+ public void testBasic() {
+ String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+ runTest(Collections.emptyMap(), dname, dname);
+ runTest(ImmutableMap.of("executor", "E1"), "E1", "E1");
+ runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname);
+ runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3");
+ runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2", "E1");
+ runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1", "E3");
+ runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"), "E2", "E3");
+ runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2", "multi_executor", "E3"),
+ "E2", "E3");
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
new file mode 100644
index 0000000..68a9650
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.util.Stat;
+
+public class TestScanInfo implements ScanInfo {
+
+ String testId;
+ Type scanType;
+ long creationTime;
+ OptionalLong lastRunTime = OptionalLong.empty();
+ Stat runTimeStats = new Stat();
+ Stat idleTimeStats = new Stat();
+
+ TestScanInfo(String testId, Type scanType, long creationTime, int... times) {
+ this.testId = testId;
+ this.scanType = scanType;
+ this.creationTime = creationTime;
+
+ for (int i = 0; i < times.length; i += 2) {
+ long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]);
+ long runDuration = times[i + 1] - times[i];
+ runTimeStats.addStat(runDuration);
+ idleTimeStats.addStat(idleDuration);
+ }
+
+ if (times.length > 0) {
+ lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime);
+ }
+ }
+
+ @Override
+ public Type getScanType() {
+ return scanType;
+ }
+
+ @Override
+ public String getTableId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public OptionalLong getLastRunTime() {
+ return lastRunTime;
+ }
+
+ @Override
+ public Stats getRunTimeStats() {
+ return runTimeStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats() {
+ return idleTimeStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats(long currentTime) {
+ Stat copy = idleTimeStats.copy();
+ copy.addStat(currentTime - lastRunTime.orElse(creationTime));
+ return copy;
+ }
+
+ @Override
+ public Set<Column> getFetchedColumns() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<IteratorConfiguration> getClientScanIterators() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
index 69df38d..a2986ac 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
@@ -43,32 +43,26 @@ public class StatTest {
@Test
public void testGetMin() {
- assertEquals(0, zero.getMin());
- assertEquals(3677, stat.getMin());
+ assertEquals(0, zero.min());
+ assertEquals(3677, stat.min());
}
@Test
public void testGetMax() {
- assertEquals(0, zero.getMax());
- assertEquals(9792, stat.getMax());
+ assertEquals(0, zero.max());
+ assertEquals(9792, stat.max());
}
@Test
public void testGetAverage() {
- assertEquals(0, zero.getAverage(), delta);
- assertEquals(5529, stat.getAverage(), delta);
- }
-
- @Test
- public void testGetStdDev() {
- assertEquals(0, zero.getStdDev(), delta);
- assertEquals(2073.7656569632, stat.getStdDev(), delta);
+ assertEquals(0, zero.mean(), delta);
+ assertEquals(5529, stat.mean(), delta);
}
@Test
public void testGetSum() {
- assertEquals(0, zero.getSum());
- assertEquals(38703, stat.getSum());
+ assertEquals(0, zero.sum());
+ assertEquals(38703, stat.sum());
}
@Test
@@ -76,16 +70,14 @@ public class StatTest {
zero.clear();
stat.clear();
- assertEquals(0, zero.getMax());
- assertEquals(zero.getMax(), stat.getMax());
- assertEquals(0, zero.getMin());
- assertEquals(zero.getMin(), stat.getMin());
- assertEquals(0, zero.getSum());
- assertEquals(zero.getSum(), stat.getSum());
+ assertEquals(0, zero.max());
+ assertEquals(zero.max(), stat.max());
+ assertEquals(0, zero.min());
+ assertEquals(zero.min(), stat.min());
+ assertEquals(0, zero.sum());
+ assertEquals(zero.sum(), stat.sum());
- assertEquals(Double.NaN, zero.getAverage(), 0);
- assertEquals(zero.getAverage(), stat.getAverage(), 0);
- assertEquals(Double.NaN, zero.getStdDev(), 0);
- assertEquals(zero.getStdDev(), stat.getStdDev(), 0);
+ assertEquals(Double.NaN, zero.mean(), 0);
+ assertEquals(zero.mean(), stat.mean(), 0);
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index dbb7ca4..cc67feb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.conf;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -216,4 +218,39 @@ public class TableConfiguration extends ObservableConfiguration {
return pic;
}
+
+ public static class TablesScanDispatcher {
+ public final ScanDispatcher dispatcher;
+ public final long count;
+
+ public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
+ this.dispatcher = dispatcher;
+ this.count = count;
+ }
+ }
+
+ private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new AtomicReference<>();
+
+ public ScanDispatcher getScanDispatcher() {
+ long count = getUpdateCount();
+ TablesScanDispatcher currRef = scanDispatcherRef.get();
+ if (currRef == null || currRef.count != count) {
+ ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
+ Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
+
+ Map<String,String> opts = new HashMap<>();
+ getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
+ String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+ opts.put(optKey, v);
+ });
+
+ newDispatcher.init(Collections.unmodifiableMap(opts));
+
+ TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
+ scanDispatcherRef.compareAndSet(currRef, newRef);
+ currRef = newRef;
+ }
+
+ return currRef.dispatcher;
+ }
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 1973fd8..ab41141 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -81,11 +81,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
@Override
public String get(Property property) {
if (Property.isFixedZooPropertyKey(property)) {
- if (fixedProps.containsKey(property.getKey())) {
- return fixedProps.get(property.getKey());
+ String val = fixedProps.get(property.getKey());
+ if (val != null) {
+ return val;
} else {
synchronized (fixedProps) {
- String val = _get(property);
+ val = _get(property);
fixedProps.put(property.getKey(), val);
return val;
}
@@ -96,6 +97,12 @@ public class ZooConfiguration extends AccumuloConfiguration {
}
}
+ @Override
+ public boolean isPropertySet(Property prop) {
+ return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) != null
+ || parent.isPropertySet(prop);
+ }
+
private String getRaw(String key) {
String zPath = propPathPrefix + "/" + key;
byte[] v = propCache.get(zPath);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6e67cf0..14c0306 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -130,6 +130,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
import org.apache.accumulo.core.summary.Gatherer;
import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
import org.apache.accumulo.core.summary.SummaryCollection;
@@ -250,9 +251,9 @@ import org.apache.accumulo.tserver.scan.NextBatchTask;
import org.apache.accumulo.tserver.scan.ScanRunState;
import org.apache.accumulo.tserver.session.ConditionalSession;
import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.session.SummarySession;
import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
@@ -543,6 +544,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
+ private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+ if (extent.isRootTablet() || extent.isMeta()) {
+ // dispatcher is only for user tables
+ return null;
+ }
+
+ return getServerConfigurationFactory().getTableConfiguration(extent.getTableId())
+ .getScanDispatcher();
+ }
+
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
@@ -587,13 +598,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
if (tablet == null)
throw new NotServingTabletException(textent);
- Set<Column> columnSet = new HashSet<>();
+ HashSet<Column> columnSet = new HashSet<>();
for (TColumn tcolumn : columns) {
columnSet.add(new Column(tcolumn));
}
- final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio,
- new Authorizations(authorizations), readaheadThreshold, batchTimeOut, context);
+ final SingleScanSession scanSession = new SingleScanSession(credentials, extent, columnSet,
+ ssiList, ssio, new Authorizations(authorizations), readaheadThreshold, batchTimeOut,
+ context);
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet,
scanSession.auths, ssiList, ssio, isolated, scanSession.interruptFlag,
SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut,
@@ -619,7 +631,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
- ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+ SingleScanSession scanSession = (SingleScanSession) sessionManager.reserveSession(scanID);
if (scanSession == null) {
throw new NoSuchScanIDException();
}
@@ -631,7 +643,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
- private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession)
+ private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
@@ -639,7 +651,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+ scanSession, scanSession.nextBatchTask);
}
ScanBatch bresult;
@@ -694,7 +707,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
// to client
scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+ resourceManager.executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent),
+ scanSession, scanSession.nextBatchTask);
}
if (!scanResult.more)
@@ -705,14 +719,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public void closeScan(TInfo tinfo, long scanID) {
- final ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+ final SingleScanSession ss = (SingleScanSession) sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
if (log.isTraceEnabled()) {
log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
- (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+ (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
}
if (scanMetrics.isEnabled()) {
@@ -818,7 +832,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(TabletServer.this, scanID);
- resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+ resourceManager.executeReadAhead(session.threadPoolExtent,
+ getScanDispatcher(session.threadPoolExtent), session, session.lookupTask);
}
try {
@@ -1174,8 +1189,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
- us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
- us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+ us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, us.walogTimes.sum() / 1000.0,
+ us.commitTimes.sum() / 1000.0));
}
if (us.failures.size() > 0) {
Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 9a91e87..5ab6e27 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -21,10 +21,15 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
@@ -36,12 +41,13 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
@@ -50,6 +56,11 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheType;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanExecutor;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -67,15 +78,18 @@ import org.apache.accumulo.tserver.compaction.CompactionStrategy;
import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
-import org.apache.accumulo.tserver.session.SessionComparator;
+import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.htrace.wrappers.TraceExecutorService;
+import org.apache.htrace.wrappers.TraceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
/**
* ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -94,14 +108,13 @@ public class TabletServerResourceManager {
private final ExecutorService migrationPool;
private final ExecutorService assignmentPool;
private final ExecutorService assignMetaDataPool;
- private final ExecutorService readAheadThreadPool;
- private final ExecutorService defaultReadAheadThreadPool;
private final ExecutorService summaryRetrievalPool;
private final ExecutorService summaryParitionPool;
private final ExecutorService summaryRemotePool;
private final Map<String,ExecutorService> threadPools = new TreeMap<>();
- private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>();
+ private final Map<String,ExecutorService> scanExecutors;
+ private final Map<String,ScanExecutor> scanExecutorChoices;
private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
@@ -128,46 +141,15 @@ public class TabletServerResourceManager {
return tp;
}
- private ExecutorService addEs(final Property maxThreads, String name,
- final ThreadPoolExecutor tp) {
+ private ExecutorService addEs(IntSupplier maxThreads, String name, final ThreadPoolExecutor tp) {
ExecutorService result = addEs(name, tp);
SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
@Override
public void run() {
try {
- int max = tserver.getConfiguration().getCount(maxThreads);
+ int max = maxThreads.getAsInt();
if (tp.getMaximumPoolSize() != max) {
- log.info("Changing {} to {}", maxThreads.getKey(), max);
- tp.setCorePoolSize(max);
- tp.setMaximumPoolSize(max);
- }
- } catch (Throwable t) {
- log.error("Failed to change thread pool size", t);
- }
- }
-
- }, 1000, 10 * 1000);
- return result;
- }
-
- private ExecutorService addEs(final int maxThreads, final Property prefix,
- final String propertyName, String name, final ThreadPoolExecutor tp) {
- ExecutorService result = addEs(name, tp);
- SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
- @Override
- public void run() {
- try {
- int max = maxThreads;
- for (Entry<String,String> entry : conf.getSystemConfiguration()
- .getAllPropertiesWithPrefix(prefix).entrySet()) {
- if (entry.getKey().equals(propertyName)) {
- if (null != entry.getValue() && entry.getValue().length() != 0)
- max = Integer.parseInt(entry.getValue());
- break;
- }
- }
- if (tp.getMaximumPoolSize() != max) {
- log.info("Changing {} to {}", maxThreads, max);
+ log.info("Changing max threads for {} to {}", name, max);
tp.setCorePoolSize(max);
tp.setMaximumPoolSize(max);
}
@@ -187,7 +169,7 @@ public class TabletServerResourceManager {
ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue,
new NamingThreadFactory(name));
tp.allowCoreThreadTimeOut(true);
- return addEs(max, name, tp);
+ return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp);
}
private ExecutorService createEs(int max, String name) {
@@ -198,83 +180,52 @@ public class TabletServerResourceManager {
return createEs(max, name, new LinkedBlockingQueue<>());
}
- private ExecutorService createEs(int maxThreads, Property prefix, String propertyName,
- String name, BlockingQueue<Runnable> queue) {
- ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
- TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
- return addEs(maxThreads, prefix, propertyName, name, tp);
- }
+ private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+ Map<String,Queue<?>> scanExecQueues) {
- /**
- * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
- *
- * @param max
- * max number of threads
- * @param comparator
- * comparator property
- * @param name
- * name passed to the thread factory
- * @return priority executor
- */
- private ExecutorService createPriorityExecutor(Property prefix, String propertyName,
- final int maxThreads, Property comparator, String name) {
-
- String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+ BlockingQueue<Runnable> queue;
- if (null == comparatorClazz || comparatorClazz.length() == 0) {
- log.debug("Using no comparator");
- return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+ if (sec.prioritizerClass.orElse("").isEmpty()) {
+ queue = new LinkedBlockingQueue<>();
} else {
- SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
- conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
- if (null != comparatorObj) {
- log.debug("Using priority based scheduler {}", comparatorClazz);
- return createEs(maxThreads, prefix, propertyName, name,
- new PriorityBlockingQueue<>(maxThreads, comparatorObj));
- } else {
- log.debug("Using no comparator");
- return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+ ScanPrioritizer factory = null;
+ try {
+ factory = ConfigurationTypeHelper.getClassInstance(null, sec.prioritizerClass.get(),
+ ScanPrioritizer.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- }
- }
- /**
- * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
- *
- * @param max
- * max number of threads
- * @param comparator
- * comparator property
- * @param name
- * name passed to the thread factory
- * @return priority executor
- */
- private ExecutorService createPriorityExecutor(Property max, Property comparator, String name) {
- int maxThreads = conf.getSystemConfiguration().getCount(max);
+ if (factory == null) {
+ queue = new LinkedBlockingQueue<>();
+ } else {
+ Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts);
- String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+ // function to extract scan scan session from runnable
+ Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r)
+ .getRunnable()).getScanInfo();
- if (null == comparatorClazz || comparatorClazz.length() == 0) {
- log.debug("Using no comparator");
- return createEs(max, name, new LinkedBlockingQueue<>());
- } else {
- SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
- conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
- if (null != comparatorObj) {
- log.debug("Using priority based scheduler {}", comparatorClazz);
- return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj));
- } else {
- log.debug("Using no comparator");
- return createEs(max, name, new LinkedBlockingQueue<>());
+ queue = new PriorityBlockingQueue<>(sec.maxThreads,
+ Comparator.comparing(extractor, comparator));
}
}
+
+ scanExecQueues.put(sec.name, queue);
+
+ return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name, queue, sec.priority);
}
- private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
- int maxThreads = conf.getSystemConfiguration().getCount(max);
+ private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name,
+ BlockingQueue<Runnable> queue, OptionalInt priority) {
+ int maxThreads = maxThreadsSupplier.getAsInt();
ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
- TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
- return addEs(max, name, tp);
+ TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority));
+ return addEs(maxThreadsSupplier, name, tp);
+ }
+
+ private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
+ IntSupplier maxThreadsSupplier = () -> conf.getSystemConfiguration().getCount(max);
+ return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty());
}
private ExecutorService createEs(int min, int max, int timeout, String name) {
@@ -282,33 +233,78 @@ public class TabletServerResourceManager {
new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
}
- /**
- * Creates table specific thread pool for executing scan threads
- *
- * @param instance
- * ZK instance.
- * @param acuConf
- * accumulo configuration.
- * @throws NamespaceNotFoundException
- * Error thrown by tables.getTableId when a name space is not found.
- * @throws TableNotFoundException
- * Error thrown by tables.getTableId when a table is not found.
- */
- protected void createTablePools(Instance instance, AccumuloConfiguration acuConf)
- throws NamespaceNotFoundException, TableNotFoundException {
- for (Entry<String,String> entry : acuConf
- .getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) {
- final String tableName = entry.getKey()
- .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length());
- if (null == entry.getValue() || entry.getValue().length() == 0) {
- throw new RuntimeException("Read ahead prefix is inproperly configured");
+ protected Map<String,ExecutorService> createScanExecutors(Instance instance,
+ Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+ Builder<String,ExecutorService> builder = ImmutableMap.builder();
+
+ for (ScanExecutorConfig sec : scanExecCfg) {
+ builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues));
+ }
+
+ return builder.build();
+ }
+
+ private static class ScanExecutorImpl implements ScanExecutor {
+
+ private static class ConfigImpl implements ScanExecutor.Config {
+
+ final ScanExecutorConfig cfg;
+
+ public ConfigImpl(ScanExecutorConfig sec) {
+ this.cfg = sec;
+ }
+
+ @Override
+ public String getName() {
+ return cfg.name;
}
- final int maxThreads = Integer.parseInt(entry.getValue());
- final String tableId = Tables.getTableId(instance, tableName).canonicalID();
- tableThreadPools.put(tableId,
- createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, entry.getKey(), maxThreads,
- Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific read ahead"));
+
+ @Override
+ public int getMaxThreads() {
+ return cfg.maxThreads;
+ }
+
+ @Override
+ public Optional<String> getPrioritizerClass() {
+ return cfg.prioritizerClass;
+ }
+
+ @Override
+ public Map<String,String> getPrioritizerOptions() {
+ return cfg.prioritizerOpts;
+ }
+
}
+
+ private final ConfigImpl config;
+ private final Queue<?> queue;
+
+ ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) {
+ this.config = new ConfigImpl(sec);
+ this.queue = q;
+ }
+
+ @Override
+ public int getQueued() {
+ return queue.size();
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ }
+
+ private Map<String,ScanExecutor> createScanExecutorChoices(
+ Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> scanExecQueues) {
+ Builder<String,ScanExecutor> builder = ImmutableMap.builder();
+
+ for (ScanExecutorConfig sec : scanExecCfg) {
+ builder.put(sec.name, new ScanExecutorImpl(sec, scanExecQueues.get(sec.name)));
+ }
+
+ return builder.build();
}
public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
@@ -390,11 +386,6 @@ public class TabletServerResourceManager {
activeAssignments = new ConcurrentHashMap<>();
- readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
- Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
- defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
- "metadata tablets read ahead");
-
summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
"summary file retriever", 60, TimeUnit.SECONDS);
summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60,
@@ -402,13 +393,11 @@ public class TabletServerResourceManager {
summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
"summary partition", 60, TimeUnit.SECONDS);
- try {
- createTablePools(tserver.getInstance(), acuConf);
- } catch (NamespaceNotFoundException e) {
- throw new RuntimeException(e);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
- }
+ Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
+ Map<String,Queue<?>> scanExecQueues = new HashMap<>();
+ scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg, scanExecQueues);
+ scanExecutorChoices = createScanExecutorChoices(scanExecCfg, scanExecQueues);
+
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -938,16 +927,29 @@ public class TabletServerResourceManager {
}
}
- public void executeReadAhead(KeyExtent tablet, Runnable task) {
- ExecutorService service = tableThreadPools.get(tablet.getTableId().canonicalID());
- if (null != service) {
- service.execute(task);
- } else if (tablet.isRootTablet()) {
+ public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo,
+ Runnable task) {
+
+ task = ScanSession.wrap(scanInfo, task);
+
+ if (tablet.isRootTablet()) {
task.run();
} else if (tablet.isMeta()) {
- defaultReadAheadThreadPool.execute(task);
+ scanExecutors.get("meta").execute(task);
} else {
- readAheadThreadPool.execute(task);
+ String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices);
+ ExecutorService executor = scanExecutors.get(scanExecutorName);
+ if (executor == null) {
+ log.warn(
+ "For table id {}, {} dispatched to non-existant executor {} Using default executor.",
+ tablet.getTableId(), dispatcher.getClass().getName(), scanExecutorName);
+ executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+ } else if ("meta".equals(scanExecutorName)) {
+ log.warn("For table id {}, {} dispatched to meta executor. Using default executor.",
+ tablet.getTableId(), dispatcher.getClass().getName());
+ executor = scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+ }
+ executor.execute(task);
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index aee4477..16b9c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,7 +23,7 @@ import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.TooManyFilesException;
-import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletClosedException;
@@ -48,7 +48,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
@Override
public void run() {
- final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+ final SingleScanSession scanSession = (SingleScanSession) server.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
try {
@@ -69,10 +69,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> {
return;
}
- long t1 = System.currentTimeMillis();
ScanBatch batch = scanSession.scanner.read();
- long t2 = System.currentTimeMillis();
- scanSession.nbTimes.addStat(t2 - t1);
// there should only be one thing on the queue at a time, so
// it should be ok to call add()
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
deleted file mode 100644
index d584242..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-public class DefaultSessionComparator extends SessionComparator {
-
- @Override
- public int compareSession(Session sessionA, Session sessionB) {
-
- final long startTimeFirst = sessionA.startTime;
- final long startTimeSecond = sessionB.startTime;
-
- // use the lowest max idle time
- final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime
- ? sessionA.maxIdleAccessTime
- : sessionB.maxIdleAccessTime;
-
- final long currentTime = System.currentTimeMillis();
-
- /*
- * Multiply by -1 so that we have a sensical comparison. This means that if comparison < 0,
- * sessionA is newer. If comparison > 0, this means that session B is newer
- */
- int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
-
- if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
- if (comparison >= 0) {
- long idleTimeA = currentTime - sessionA.lastExecTime;
-
- /*
- * If session B is newer, let's make sure that we haven't reached the max idle time, where
- * we have to begin aging A
- */
- if (idleTimeA > sessionA.maxIdleAccessTime) {
- comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
- }
- } else {
- long idleTimeB = currentTime - sessionB.lastExecTime;
-
- /*
- * If session A is newer, let's make sure that B hasn't reached the max idle time, where we
- * have to begin aging A
- */
- if (idleTimeB > sessionA.maxIdleAccessTime) {
- comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
- }
- }
- }
-
- return comparison;
- }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 981832a..f8db1f4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -21,7 +21,6 @@ import java.util.List;
import java.util.Map;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
@@ -30,13 +29,9 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.tserver.scan.ScanTask;
-public class MultiScanSession extends Session {
+public class MultiScanSession extends ScanSession {
public final KeyExtent threadPoolExtent;
- public final HashSet<Column> columnSet = new HashSet<>();
public final Map<KeyExtent,List<Range>> queries;
- public final List<IterInfo> ssiList;
- public final Map<String,Map<String,String>> ssio;
- public final Authorizations auths;
public final SamplerConfiguration samplerConfig;
public final long batchTimeOut;
public final String context;
@@ -53,11 +48,8 @@ public class MultiScanSession extends Session {
Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, Authorizations authorizations,
SamplerConfiguration samplerConfig, long batchTimeOut, String context) {
- super(credentials);
+ super(credentials, new HashSet<>(), ssiList, ssio, authorizations);
this.queries = queries;
- this.ssiList = ssiList;
- this.ssio = ssio;
- this.auths = authorizations;
this.threadPoolExtent = threadPoolExtent;
this.samplerConfig = samplerConfig;
this.batchTimeOut = batchTimeOut;
@@ -65,20 +57,20 @@ public class MultiScanSession extends Session {
}
@Override
+ public Type getScanType() {
+ return Type.MULTI;
+ }
+
+ @Override
+ public String getTableId() {
+ return threadPoolExtent.getTableId().canonicalID();
+ }
+
+ @Override
public boolean cleanup() {
if (lookupTask != null)
lookupTask.cancel(true);
// the cancellation should provide us the safety to return true here
return true;
}
-
- /**
- * Ensure that the runnable actually runs
- */
- @Override
- public void run() {
- super.run();
- lookupTask.run();
- }
-
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index a55de1e..5cde06c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -16,64 +16,148 @@
*/
package org.apache.accumulo.tserver.session;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.scan.ScanTask;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-import org.apache.accumulo.tserver.tablet.Scanner;
-
-public class ScanSession extends Session {
- public final Stat nbTimes = new Stat();
- public final KeyExtent extent;
- public final Set<Column> columnSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class ScanSession extends Session implements ScanInfo {
+
+ public static class ScanMeasurer implements Runnable {
+
+ private ScanSession session;
+ private Runnable task;
+
+ ScanMeasurer(ScanSession session, Runnable task) {
+ this.session = session;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ long t1 = System.currentTimeMillis();
+ task.run();
+ long t2 = System.currentTimeMillis();
+ session.finishedRun(t1, t2);
+ }
+
+ public ScanInfo getScanInfo() {
+ return session;
+ }
+ }
+
+ public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+ return new ScanMeasurer(scanInfo, r);
+ }
+
+ private OptionalLong lastRunTime = OptionalLong.empty();
+ private Stat idleStats = new Stat();
+ public Stat runStats = new Stat();
+
+ public final HashSet<Column> columnSet;
public final List<IterInfo> ssiList;
public final Map<String,Map<String,String>> ssio;
public final Authorizations auths;
- public final AtomicBoolean interruptFlag = new AtomicBoolean();
- public long entriesReturned = 0;
- public long batchCount = 0;
- public volatile ScanTask<ScanBatch> nextBatchTask;
- public Scanner scanner;
- public final long readaheadThreshold;
- public final long batchTimeOut;
- public final String context;
-
- public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
- long readaheadThreshold, long batchTimeOut, String context) {
+
+ ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo> ssiList,
+ Map<String,Map<String,String>> ssio, Authorizations auths) {
super(credentials);
- this.extent = extent;
- this.columnSet = columnSet;
+ this.columnSet = cols;
this.ssiList = ssiList;
this.ssio = ssio;
- this.auths = authorizations;
- this.readaheadThreshold = readaheadThreshold;
- this.batchTimeOut = batchTimeOut;
- this.context = context;
+ this.auths = auths;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return startTime;
+ }
+
+ @Override
+ public OptionalLong getLastRunTime() {
+ return lastRunTime;
}
@Override
- public boolean cleanup() {
- final boolean ret;
- try {
- if (nextBatchTask != null)
- nextBatchTask.cancel(true);
- } finally {
- if (scanner != null)
- ret = scanner.close();
- else
- ret = true;
+ public Stats getRunTimeStats() {
+ return runStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats() {
+ return idleStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats(long currentTime) {
+ long idleTime = currentTime - getLastRunTime().orElse(getCreationTime());
+ Preconditions.checkArgument(idleTime >= 0);
+ Stat copy = idleStats.copy();
+ copy.addStat(idleTime);
+ return copy;
+ }
+
+ @Override
+ public Set<Column> getFetchedColumns() {
+ return Collections.unmodifiableSet(columnSet);
+ }
+
+ private class IterConfImpl implements IteratorConfiguration {
+
+ private IterInfo ii;
+
+ IterConfImpl(IterInfo ii) {
+ this.ii = ii;
+ }
+
+ @Override
+ public String getIteratorClass() {
+ return ii.className;
+ }
+
+ @Override
+ public String getName() {
+ return ii.iterName;
+ }
+
+ @Override
+ public int getPriority() {
+ return ii.priority;
+ }
+
+ @Override
+ public Map<String,String> getOptions() {
+ Map<String,String> opts = ssio.get(ii.iterName);
+ return opts == null || opts.isEmpty() ? Collections.emptyMap()
+ : Collections.unmodifiableMap(opts);
}
- return ret;
}
+ @Override
+ public Collection<IteratorConfiguration> getClientScanIterators() {
+ return Lists.transform(ssiList, IterConfImpl::new);
+ }
+
+ public void finishedRun(long start, long finish) {
+ long idleTime = start - getLastRunTime().orElse(getCreationTime());
+ long runTime = finish - start;
+ lastRunTime = OptionalLong.of(finish);
+ idleStats.addStat(idleTime);
+ runStats.addStat(runTime);
+ }
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 32f7e34..6bd52e4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.session;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.server.rpc.TServerUtils;
-public abstract class Session implements Runnable {
+public class Session {
enum State {
NEW, UNRESERVED, RESERVED, REMOVED
@@ -27,9 +27,7 @@ public abstract class Session implements Runnable {
public final String client;
public long lastAccessTime;
- protected volatile long lastExecTime = -1;
public long startTime;
- public long maxIdleAccessTime;
State state = State.NEW;
private final TCredentials credentials;
@@ -49,18 +47,4 @@ public abstract class Session implements Runnable {
public boolean cleanup() {
return true;
}
-
- @Override
- public void run() {
- lastExecTime = System.currentTimeMillis();
- }
-
- public void setLastExecutionTime(long lastExecTime) {
- this.lastExecTime = lastExecTime;
- }
-
- public long getLastExecutionTime() {
- return lastExecTime;
- }
-
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index c485698..1ba2491 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,9 +231,6 @@ public class SessionManager {
configuredIdle = maxUpdateIdle;
}
long idleTime = System.currentTimeMillis() - session.lastAccessTime;
- if (idleTime > session.maxIdleAccessTime) {
- session.maxIdleAccessTime = idleTime;
- }
if (idleTime > configuredIdle) {
log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
session.client, idleTime);
@@ -318,8 +315,8 @@ public class SessionManager {
ScanTask nbt = null;
Table.ID tableID = null;
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
+ if (session instanceof SingleScanSession) {
+ SingleScanSession ss = (SingleScanSession) session;
nbt = ss.nextBatchTask;
tableID = ss.extent.getTableId();
} else if (session instanceof MultiScanSession) {
@@ -365,8 +362,8 @@ public class SessionManager {
for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) {
Session session = entry.getValue();
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
+ if (session instanceof SingleScanSession) {
+ SingleScanSession ss = (SingleScanSession) session;
ScanState state = ScanState.RUNNING;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
similarity index 80%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
copy to server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
index a55de1e..fb3e29f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -16,9 +16,9 @@
*/
package org.apache.accumulo.tserver.session;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.data.Column;
@@ -26,18 +26,12 @@ import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.tserver.scan.ScanTask;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Scanner;
-public class ScanSession extends Session {
- public final Stat nbTimes = new Stat();
+public class SingleScanSession extends ScanSession {
public final KeyExtent extent;
- public final Set<Column> columnSet;
- public final List<IterInfo> ssiList;
- public final Map<String,Map<String,String>> ssio;
- public final Authorizations auths;
public final AtomicBoolean interruptFlag = new AtomicBoolean();
public long entriesReturned = 0;
public long batchCount = 0;
@@ -47,21 +41,27 @@ public class ScanSession extends Session {
public final long batchTimeOut;
public final String context;
- public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet,
+ public SingleScanSession(TCredentials credentials, KeyExtent extent, HashSet<Column> columnSet,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, Authorizations authorizations,
long readaheadThreshold, long batchTimeOut, String context) {
- super(credentials);
+ super(credentials, columnSet, ssiList, ssio, authorizations);
this.extent = extent;
- this.columnSet = columnSet;
- this.ssiList = ssiList;
- this.ssio = ssio;
- this.auths = authorizations;
this.readaheadThreshold = readaheadThreshold;
this.batchTimeOut = batchTimeOut;
this.context = context;
}
@Override
+ public Type getScanType() {
+ return Type.SINGLE;
+ }
+
+ @Override
+ public String getTableId() {
+ return extent.getTableId().canonicalID();
+ }
+
+ @Override
public boolean cleanup() {
final boolean ret;
try {
@@ -75,5 +75,4 @@ public class ScanSession extends Session {
}
return ret;
}
-
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 18da8a4..dc5d684 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1540,7 +1540,8 @@ public class Tablet implements TabletCommitter {
try {
getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
} catch (RuntimeException t) {
- log.debug("removing {} because we encountered an exception enqueing the CompactionRunner", reason, t);
+ log.debug("removing {} because we encountered an exception enqueing the CompactionRunner",
+ reason, t);
majorCompactionQueued.remove(reason);
throw t;
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
deleted file mode 100644
index f7cc5cd..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.session;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-public class SessionComparatorTest {
-
- @Test
- public void testSingleScanMultiScanNoRun() {
- long time = System.currentTimeMillis();
- ScanSession sessionA = emptyScanSession();
- sessionA.lastAccessTime = 0;
- sessionA.maxIdleAccessTime = 0;
- sessionA.startTime = time - 1000;
-
- MultiScanSession sessionB = emptyMultiScanSession();
- sessionB.lastAccessTime = 0;
- sessionB.maxIdleAccessTime = 1000;
- sessionB.startTime = time - 800;
-
- ScanSession sessionC = emptyScanSession();
- sessionC.lastAccessTime = 0;
- sessionC.maxIdleAccessTime = 1000;
- sessionC.startTime = time - 800;
-
- // a has never run, so it should be given priority
- SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
- assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- assertEquals(1, comparator.compareSession(sessionB, sessionA));
-
- // now let's assume they have been executed
-
- assertEquals(1, comparator.compareSession(sessionA, sessionC));
-
- assertEquals(0, comparator.compareSession(sessionC, sessionC));
-
- }
-
- @Test
- public void testSingleScanRun() {
- long time = System.currentTimeMillis();
- ScanSession sessionA = emptyScanSession();
- sessionA.lastAccessTime = 0;
- sessionA.setLastExecutionTime(time);
- sessionA.maxIdleAccessTime = 1000;
- sessionA.startTime = time - 1000;
-
- ScanSession sessionB = emptyScanSession();
- sessionB.lastAccessTime = 0;
- sessionB.setLastExecutionTime(time - 2000);
- sessionB.maxIdleAccessTime = 1000;
- sessionB.startTime = time - 800;
-
- // b is newer
- SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
- assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
- sessionB.setLastExecutionTime(time);
- sessionA.setLastExecutionTime(time - 2000);
-
- assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- int comp = comparator.compareSession(sessionB, sessionA);
- assertTrue("comparison is " + comp, comp >= 1);
- }
-
- @Test
- public void testSingleScanMultiScanRun() {
- long time = System.currentTimeMillis();
- ScanSession sessionA = emptyScanSession();
- sessionA.lastAccessTime = 0;
- sessionA.setLastExecutionTime(time);
- sessionA.maxIdleAccessTime = 1000;
- sessionA.startTime = time - 1000;
-
- MultiScanSession sessionB = emptyMultiScanSession();
- sessionB.lastAccessTime = 0;
- sessionB.setLastExecutionTime(time - 2000);
- sessionB.maxIdleAccessTime = 1000;
- sessionB.startTime = time - 800;
-
- // b is newer
- SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
- assertEquals(-1, comparator.compareSession(sessionA, sessionB));
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
-
- sessionB.setLastExecutionTime(time);
- sessionA.setLastExecutionTime(time - 2000);
-
- assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- int comp = comparator.compareSession(sessionB, sessionA);
- assertTrue("comparison is " + comp, comp > 0);
- }
-
- @Test
- public void testMultiScanRun() {
- long time = System.currentTimeMillis();
- ScanSession sessionA = emptyScanSession();
- sessionA.lastAccessTime = 0;
- sessionA.setLastExecutionTime(time);
- sessionA.maxIdleAccessTime = 1000;
- sessionA.startTime = time - 1000;
-
- ScanSession sessionB = emptyScanSession();
- sessionB.lastAccessTime = 0;
- sessionB.setLastExecutionTime(time - 2000);
- sessionB.maxIdleAccessTime = 1000;
- sessionB.startTime = time - 800;
-
- // b is newer
- SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
- assertEquals(1, comparator.compareSession(sessionA, sessionB));
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
-
- sessionB.setLastExecutionTime(time);
- sessionA.setLastExecutionTime(time - 2000);
-
-
- assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
-
- // b is before a in queue, b has never run, but because a is single
- // we should be given priority
- int comp = comparator.compareSession(sessionB, sessionA);
- assertTrue("comparison is " + comp, comp >= 1);
- }
-
- private static ScanSession emptyScanSession() {
- return new ScanSession(null, null, null, null, null, null, 0, 0, null);
- }
-
- private static MultiScanSession emptyMultiScanSession() {
- return new MultiScanSession(null, null, null, null, null, null, null, 0, null);
- }
-}
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
index e807dfb..320b9c2 100644
--- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -68,7 +68,7 @@ public class IMMLGBenchmark {
}
for (Entry<String,Stat> entry : stats.entrySet()) {
- System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().getAverage());
+ System.out.printf("%20s : %6.2f\n", entry.getKey(), entry.getValue().mean());
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index cbeb1e4..907f2c6 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -606,9 +606,8 @@ public class CollectTabletStats {
}
private static void printStat(String desc, Stat s) {
- System.out.printf(
- "\t\tDescription: [%30s] average: %,6.2f std dev: %,6.2f min: %,d max: %,d %n", desc,
- s.getAverage(), s.getStdDev(), s.getMin(), s.getMax());
+ System.out.printf("\t\tDescription: [%30s] average: %,6.2f min: %,d max: %,d %n", desc,
+ s.mean(), s.min(), s.max());
}