You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/08/08 23:22:56 UTC

[accumulo] branch master updated: ACCUMULO-4074 Refine scan executor interfaces (#580)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new dc3b79b  ACCUMULO-4074 Refine scan executor interfaces (#580)
dc3b79b is described below

commit dc3b79bd19f387466ff32c85f68b74d0833e4a16
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Wed Aug 8 19:22:53 2018 -0400

    ACCUMULO-4074 Refine scan executor interfaces (#580)
---
 .../core/spi/scan/HintScanPrioritizer.java         |  8 ++--
 .../core/spi/scan/IdleRatioScanPrioritizer.java    |  5 +-
 .../accumulo/core/spi/scan/ScanDispatcher.java     | 53 ++++++++++++++++------
 .../accumulo/core/spi/scan/ScanPrioritizer.java    | 19 +++++++-
 .../core/spi/scan/SimpleScanDispatcher.java        |  8 ++--
 .../spi/scan/IdleRatioScanPrioritizerTest.java     |  2 +-
 .../core/spi/scan/SimpleScanDispatcherTest.java    | 30 ++++++++++--
 .../accumulo/server/conf/TableConfiguration.java   |  9 ++--
 .../tserver/TabletServerResourceManager.java       | 15 +++++-
 9 files changed, 115 insertions(+), 34 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
index 80c036f..01b1437 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
@@ -90,12 +90,12 @@ public class HintScanPrioritizer implements ScanPrioritizer {
   }
 
   @Override
-  public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+  public Comparator<ScanInfo> createComparator(CreateParameters params) {
     int defaultPriority = Integer
-        .parseInt(options.getOrDefault("default_priority", Integer.MAX_VALUE + ""));
+        .parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE + ""));
 
-    HintProblemAction hpa = HintProblemAction.valueOf(
-        options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
+    HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions()
+        .getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
 
     Comparator<ScanInfo> cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority, hpa));
 
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
index c567460..9f5adf7 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -18,7 +18,6 @@
 package org.apache.accumulo.core.spi.scan;
 
 import java.util.Comparator;
-import java.util.Map;
 
 import com.google.common.base.Preconditions;
 
@@ -37,8 +36,8 @@ public class IdleRatioScanPrioritizer implements ScanPrioritizer {
   }
 
   @Override
-  public Comparator<ScanInfo> createComparator(Map<String,String> options) {
-    Preconditions.checkArgument(options.isEmpty());
+  public Comparator<ScanInfo> createComparator(CreateParameters params) {
+    Preconditions.checkArgument(params.getOptions().isEmpty());
 
     Comparator<ScanInfo> c1 = (si1, si2) -> {
       long currTime = System.currentTimeMillis();
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
index a3bc3f1..9fc9d91 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -28,25 +28,52 @@ import com.google.common.base.Preconditions;
  * @since 2.0.0
  */
 public interface ScanDispatcher {
+
+  /**
+   * The method parameters for {@link ScanDispatcher#init(InitParameters)}. This interface exists so
+   * the API can evolve and additional parameters can be passed to the method in the future.
+   *
+   * @since 2.0.0
+   */
+  public static interface InitParameters {
+    /**
+     *
+     * @return The configured options. For example if the table properties
+     *         {@code table.scan.dispatcher.opts.p1=abc} and
+     *         {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
+     *         {@code p1=abc} and {@code p9=123}.
+     */
+    Map<String,String> getOptions();
+  }
+
   /**
    * This method is called once after a ScanDispatcher is instantiated.
+   */
+  public default void init(InitParameters params) {
+    Preconditions.checkArgument(params.getOptions().isEmpty(), "No options expected");
+  }
+
+  /**
+   * The method parameters for {@link ScanDispatcher#dispatch(DispatchParmaters)}. This interface
+   * exists so the API can evolve and additional parameters can be passed to the method in the
+   * future.
    *
-   * @param options
-   *          The configured options. For example if the table properties
-   *          {@code table.scan.dispatcher.opts.p1=abc} and
-   *          {@code table.scan.dispatcher.opts.p9=123} were set, then this map would contain
-   *          {@code p1=abc} and {@code p9=123}.
+   * @since 2.0.0
    */
-  public default void init(Map<String,String> options) {
-    Preconditions.checkArgument(options.isEmpty(), "No options expected");
+  public static interface DispatchParmaters {
+    /**
+     * @return information about the scan to be dispatched.
+     */
+    ScanInfo getScanInfo();
+
+    /**
+     * @return the currently configured scan executors
+     */
+    Map<String,ScanExecutor> getScanExecutors();
   }
 
   /**
-   * @param scanInfo
-   *          Information about the scan.
-   * @param scanExecutors
-   *          Information about the currently configured executors.
-   * @return Should return one of the executors named in scanExecutors.keySet()
+   * @return Should return one of the executors named params.getScanExecutors().keySet()
    */
-  String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+  String dispatch(DispatchParmaters params);
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
index 51a4254..07fdf91 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -26,5 +26,22 @@ import java.util.Map;
  * @since 2.0.0
  */
 public interface ScanPrioritizer {
-  Comparator<ScanInfo> createComparator(Map<String,String> options);
+
+  /**
+   * The method parameters for {@link ScanPrioritizer#createComparator(CreateParameters)}. This
+   * interface exists so the API can evolve and additional parameters can be passed to the method in
+   * the future.
+   *
+   * @since 2.0.0
+   */
+  public static interface CreateParameters {
+    /**
+     * @return The options configured for the scan prioritizer with properties of the form
+     *         {@code tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>}. Only the
+     *         {@code <key>=<value>} portions of those properties ends up in the returned map.
+     */
+    Map<String,String> getOptions();
+  }
+
+  Comparator<ScanInfo> createComparator(CreateParameters params);
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index f7ff73f..9653f4f 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -79,7 +79,8 @@ public class SimpleScanDispatcher implements ScanDispatcher {
   }
 
   @Override
-  public void init(Map<String,String> options) {
+  public void init(InitParameters params) {
+    Map<String,String> options = params.getOptions();
     Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
     Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
 
@@ -94,11 +95,12 @@ public class SimpleScanDispatcher implements ScanDispatcher {
   }
 
   @Override
-  public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors) {
+  public String dispatch(DispatchParmaters params) {
+    ScanInfo scanInfo = params.getScanInfo();
     if (heedHints) {
       String executor = scanInfo.getExecutionHints().get("executor");
       if (executor != null) {
-        if (scanExecutors.containsKey(executor)) {
+        if (params.getScanExecutors().containsKey(executor)) {
           return executor;
         } else {
           switch (badHintAction) {
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
index 0128b20..38a4e0a 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -47,7 +47,7 @@ public class IdleRatioScanPrioritizerTest {
     Collections.shuffle(scans);
 
     Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
-        .createComparator(Collections.emptyMap());
+        .createComparator(Collections::emptyMap);
 
     Collections.sort(scans, comparator);
 
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index 63f21ac..0a8c773 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
 import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,6 +37,28 @@ public class SimpleScanDispatcherTest {
         .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + ".prioritizer"));
   }
 
+  private static class DispatchParametersImps implements DispatchParmaters {
+
+    private ScanInfo si;
+    private Map<String,ScanExecutor> se;
+
+    DispatchParametersImps(ScanInfo si, Map<String,ScanExecutor> se) {
+      this.si = si;
+      this.se = se;
+    }
+
+    @Override
+    public ScanInfo getScanInfo() {
+      return si;
+    }
+
+    @Override
+    public Map<String,ScanExecutor> getScanExecutors() {
+      return se;
+    }
+
+  }
+
   private void runTest(Map<String,String> opts, Map<String,String> hints, String expectedSingle,
       String expectedMulti) {
     TestScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
@@ -44,15 +67,16 @@ public class SimpleScanDispatcherTest {
     ssi.executionHints = hints;
 
     SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
-    ssd1.init(opts);
+
+    ssd1.init(() -> opts);
 
     Map<String,ScanExecutor> executors = new HashMap<>();
     executors.put("E1", null);
     executors.put("E2", null);
     executors.put("E3", null);
 
-    Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, executors));
-    Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, executors));
+    Assert.assertEquals(expectedMulti, ssd1.dispatch(new DispatchParametersImps(msi, executors)));
+    Assert.assertEquals(expectedSingle, ssd1.dispatch(new DispatchParametersImps(ssi, executors)));
   }
 
   private void runTest(Map<String,String> opts, String expectedSingle, String expectedMulti) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 706880c..7741cae 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.server.conf;
 import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
@@ -237,13 +236,15 @@ public class TableConfiguration extends ObservableConfiguration {
       ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
           Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
 
-      Map<String,String> opts = new HashMap<>();
+      Builder<String,String> builder = ImmutableMap.builder();
       getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
         String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
-        opts.put(optKey, v);
+        builder.put(optKey, v);
       });
 
-      newDispatcher.init(Collections.unmodifiableMap(opts));
+      Map<String,String> opts = builder.build();
+
+      newDispatcher.init(() -> opts);
 
       TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
       scanDispatcherRef.compareAndSet(currRef, newRef);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 14893e8..5e59025 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.cache.CacheType;
 import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher.DispatchParmaters;
 import org.apache.accumulo.core.spi.scan.ScanExecutor;
 import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
@@ -198,7 +199,7 @@ public class TabletServerResourceManager {
       if (factory == null) {
         queue = new LinkedBlockingQueue<>();
       } else {
-        Comparator<ScanInfo> comparator = factory.createComparator(sec.prioritizerOpts);
+        Comparator<ScanInfo> comparator = factory.createComparator(() -> sec.prioritizerOpts);
 
         // function to extract scan scan session from runnable
         Function<Runnable,ScanInfo> extractor = r -> ((ScanSession.ScanMeasurer) ((TraceRunnable) r)
@@ -937,7 +938,17 @@ public class TabletServerResourceManager {
     } else if (tablet.isMeta()) {
       scanExecutors.get("meta").execute(task);
     } else {
-      String scanExecutorName = dispatcher.dispatch(scanInfo, scanExecutorChoices);
+      String scanExecutorName = dispatcher.dispatch(new DispatchParmaters() {
+        @Override
+        public ScanInfo getScanInfo() {
+          return scanInfo;
+        }
+
+        @Override
+        public Map<String,ScanExecutor> getScanExecutors() {
+          return scanExecutorChoices;
+        }
+      });
       ExecutorService executor = scanExecutors.get(scanExecutorName);
       if (executor == null) {
         log.warn(