You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/08/08 23:22:56 UTC
[accumulo] branch master updated: ACCUMULO-4074 Refine scan
executor interfaces (#580)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new dc3b79b ACCUMULO-4074 Refine scan executor interfaces (#580)
dc3b79b is described below
commit dc3b79bd19f387466ff32c85f68b74d0833e4a16
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Wed Aug 8 19:22:53 2018 -0400
ACCUMULO-4074 Refine scan executor interfaces (#580)
---
.../core/spi/scan/HintScanPrioritizer.java | 8 ++--
.../core/spi/scan/IdleRatioScanPrioritizer.java | 5 +-
.../accumulo/core/spi/scan/ScanDispatcher.java | 53 ++++++++++++++++------
.../accumulo/core/spi/scan/ScanPrioritizer.java | 19 +++++++-
.../core/spi/scan/SimpleScanDispatcher.java | 8 ++--
.../spi/scan/IdleRatioScanPrioritizerTest.java | 2 +-
.../core/spi/scan/SimpleScanDispatcherTest.java | 30 ++++++++++--
.../accumulo/server/conf/TableConfiguration.java | 9 ++--
.../tserver/TabletServerResourceManager.java | 15 +++++-
9 files changed, 115 insertions(+), 34 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
index 80c036f..01b1437 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
@@ -90,12 +90,12 @@ public class HintScanPrioritizer implements ScanPrioritizer {
}
@Override
- public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+ public Comparator<ScanInfo> createComparator(CreateParameters params) {
int defaultPriority = Integer
- .parseInt(options.getOrDefault("default_priority", Integer.MAX_VALUE + ""));
+ .parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE + ""));
- HintProblemAction hpa = HintProblemAction.valueOf(
- options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
+ HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions()
+ .getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
Comparator<ScanInfo> cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority, hpa));
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
index c567460..9f5adf7 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -18,7 +18,6 @@
package org.apache.accumulo.core.spi.scan;
import java.util.Comparator;
-import java.util.Map;
import com.google.common.base.Preconditions;
@@ -37,8 +36,8 @@ public class IdleRatioScanPrioritizer implements ScanPrioritizer {
}
@Override
- public Comparator<ScanInfo> createComparator(Map<String,String> options) {
- Preconditions.checkArgument(options.isEmpty());
+ public Comparator<ScanInfo> createComparator(CreateParameters params) {
+ Preconditions.checkArgument(params.getOptions().isEmpty());
Comparator<ScanInfo> c1 = (si1, si2) -> {
long currTime = System.currentTimeMillis();
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
index a3bc3f1..9fc9d91 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -28,25 +28,52 @@ import com.google.common.base.Preconditions;
* @since 2.0.0
*/
public interface ScanDispatcher {
+
+ /**
+ * The method parameters for {@link ScanDispatcher#init(InitParameters)}. This interface exists so
+ * the API can evolve and additional parameters can be passed to the method in the future.
+ *
+ * @since 2.0.0
+ */
+ public static interface InitParameters {
+ /**
+ *
+ * @return The configured options. For example if the table properties
+ * {@code table.scan.dispatcher.opts.p1=abc} and
+ * {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
+ * {@code p1=abc} and {@code p9=123}.
+ */
+ Map<String,String> getOptions();
+ }
+
/**
* This method is called once after a ScanDispatcher is instantiated.
+ */
+ public default void init(InitParameters params) {
+ Preconditions.checkArgument(params.getOptions().isEmpty(), "No options expected");
+ }
+
+ /**
+ * The method parameters for {@link ScanDispatcher#dispatch(DispatchParmaters)}. This interface
+ * exists so the API can evolve and additional parameters can be passed to the method in the
+ * future.
*
- * @param options
- * The configured options. For example if the table properties
- * {@code table.scan.dispatcher.opts.p1=abc} and
- * {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
- * {@code p1=abc} and {@code p9=123}.
+ * @since 2.0.0
*/
- public default void init(Map<String,String> options) {
- Preconditions.checkArgument(options.isEmpty(), "No options expected");
+ public static interface DispatchParmaters {
+ /**
+ * @return information about the scan to be dispatched.
+ */
+ ScanInfo getScanInfo();
+
+ /**
+ * @return the currently configured scan executors
+ */
+ Map<String,ScanExecutor> getScanExecutors();
}
/**
- * @param scanInfo
- * Information about the scan.
- * @param scanExecutors
- * Information about the currently configured executors.
- * @return Should return one of the executors named in scanExecutors.keySet()
+ * @return Should return one of the executors named params.getScanExecutors().keySet()
*/
- String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+ String dispatch(DispatchParmaters params);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
index 51a4254..07fdf91 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -26,5 +26,22 @@ import java.util.Map;
* @since 2.0.0
*/
public interface ScanPrioritizer {
- Comparator<ScanInfo> createComparator(Map<String,String> options);
+
+ /**
+ * The method parameters for {@link ScanPrioritizer#createComparator(CreateParameters)}. This
+ * interface exists so the API can evolve and additional parameters can be passed to the method in
+ * the future.
+ *
+ * @since 2.0.0
+ */
+ public static interface CreateParameters {
+ /**
+ * @return The options configured for the scan prioritizer with properties of the form
+ * {@code tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>}. Only the
+ * {@code <key>=<value>} portions of those properties ends up in the returned map.
+ */
+ Map<String,String> getOptions();
+ }
+
+ Comparator<ScanInfo> createComparator(CreateParameters params);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index f7ff73f..9653f4f 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -79,7 +79,8 @@ public class SimpleScanDispatcher implements ScanDispatcher {
}
@Override
- public void init(Map<String,String> options) {
+ public void init(InitParameters params) {
+ Map<String,String> options = params.getOptions();
Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
@@ -94,11 +95,12 @@ public class SimpleScanDispatcher implements ScanDispatcher {
}
@Override
- public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) {
+ public String dispatch(DispatchParmaters params) {
+ ScanInfo scanInfo = params.getScanInfo();
if (heedHints) {
String executor = scanInfo.getExecutionHints().get("executor");
if (executor != null) {
- if (scanExecutors.containsKey(executor)) {
+ if (params.getScanExecutors().containsKey(executor)) {
return executor;
} else {
switch (badHintAction) {
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
index 0128b20..38a4e0a 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -47,7 +47,7 @@ public class IdleRatioScanPrioritizerTest {
Collections.shuffle(scans);
Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
- .createComparator(Collections.emptyMap());
+ .createComparator(Collections::emptyMap);
Collections.sort(scans, comparator);
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index 63f21ac..0a8c773 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
import org.junit.Assert;
import org.junit.Test;
@@ -36,6 +37,28 @@ public class SimpleScanDispatcherTest {
.endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
}
+ private static class DispatchParametersImps implements DispatchParmaters {
+
+ private ScanInfo si;
+ private Map<String,ScanExecutor> se;
+
+ DispatchParametersImps(ScanInfo si, Map<String,ScanExecutor> se) {
+ this.si = si;
+ this.se = se;
+ }
+
+ @Override
+ public ScanInfo getScanInfo() {
+ return si;
+ }
+
+ @Override
+ public Map<String,ScanExecutor> getScanExecutors() {
+ return se;
+ }
+
+ }
+
private void runTest(Map<String,String> opts, Map<String,String> hints, String expectedSingle,
String expectedMulti) {
TestScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
@@ -44,15 +67,16 @@ public class SimpleScanDispatcherTest {
ssi.executionHints = hints;
SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
- ssd1.init(opts);
+
+ ssd1.init(() -> opts);
Map<String,ScanExecutor> executors = new HashMap<>();
executors.put("E1", null);
executors.put("E2", null);
executors.put("E3", null);
- Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, executors));
- Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, executors));
+ Assert.assertEquals(expectedMulti, ssd1.dispatch(new DispatchParametersImps(msi, executors)));
+ Assert.assertEquals(expectedSingle, ssd1.dispatch(new DispatchParametersImps(ssi, executors)));
}
private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 706880c..7741cae 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.server.conf;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
@@ -237,13 +236,15 @@ public class TableConfiguration extends ObservableConfiguration {
ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
- Map<String,String> opts = new HashMap<>();
+ Builder<String,String> builder = ImmutableMap.builder();
getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
- opts.put(optKey, v);
+ builder.put(optKey, v);
});
- newDispatcher.init(Collections.unmodifiableMap(opts));
+ Map<String,String> opts = builder.build();
+
+ newDispatcher.init(() -> opts);
TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
scanDispatcherRef.compareAndSet(currRef, newRef);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 14893e8..5e59025 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
import org.apache.accumulo.core.spi.scan.ScanExecutor;
import org.apache.accumulo.core.spi.scan.ScanInfo;
import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
@@ -198,7 +199,7 @@ public class TabletServerResourceManager {
if (factory == null) {
queue = new LinkedBlockingQueue<>();
} else {
- Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts);
+ Comparator<ScanInfo> comparator = factory.createComparator(() -> sec.prioritizerOpts);
// function to extract scan scan session from runnable
Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r)
@@ -937,7 +938,17 @@ public class TabletServerResourceManager {
} else if (tablet.isMeta()) {
scanExecutors.get("meta").execute(task);
} else {
- String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices);
+ String scanExecutorName = dispatcher.dispatch(new DispatchParmaters() {
+ @Override
+ public ScanInfo getScanInfo() {
+ return scanInfo;
+ }
+
+ @Override
+ public Map<String,ScanExecutor> getScanExecutors() {
+ return scanExecutorChoices;
+ }
+ });
ExecutorService executor = scanExecutors.get(scanExecutorName);
if (executor == null) {
log.warn(