You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/08/02 18:57:47 UTC

[GitHub] keith-turner closed pull request #509: ACCUMULO-3510 Updated scheduler to use comparators for sessions

keith-turner closed pull request #509: ACCUMULO-3510 Updated scheduler to use comparators for sessions
URL: https://github.com/apache/accumulo/pull/509
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc423acca5..b127ffb46b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -542,6 +542,9 @@
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
+  TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME,
+      "A customizable Scan session comparator. Note that by default, the value is empty"
+          + " and thus uses no session comparator"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..f688010f16
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
+
+  private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+
+    log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index d8b307c36c..e085255b0e 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -26,6 +26,11 @@
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class);
 
+  // @formatter:off
+  private static final AccumuloUncaughtExceptionHandler uncaughtHandler =
+    new AccumuloUncaughtExceptionHandler();
+  // @formatter:on
+
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
 
@@ -35,7 +40,10 @@ public NamingThreadFactory(String name) {
 
   @Override
   public Thread newThread(Runnable r) {
-    return new Daemon(new LoggingRunnable(log, r), name + " " + threadNum.getAndIncrement());
+    Thread thread = new Daemon(new LoggingRunnable(log, r),
+        name + " " + threadNum.getAndIncrement());
+    thread.setUncaughtExceptionHandler(uncaughtHandler);
+    return thread;
   }
 
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 71cebbe014..acd9d2f3ad 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -62,6 +63,7 @@
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.session.SessionComparator;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
 import org.slf4j.Logger;
@@ -160,6 +162,38 @@ private ExecutorService createIdlingEs(Property max, String name, long timeout,
     return addEs(max, name, tp);
   }
 
+  /**
+   * If we cannot instantiate the comparator we will default to the linked blocking queue comparator
+   *
+   * @param max
+   *          max number of threads
+   * @param comparator
+   *          comparator property
+   * @param name
+   *          name passed to the thread factory
+   * @return priority executor
+   */
+  private ExecutorService createPriorityExecutor(Property max, Property comparator, String name) {
+    int maxThreads = conf.getSystemConfiguration().getCount(max);
+
+    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+
+    if (null == comparatorClazz || comparatorClazz.length() == 0) {
+      log.debug("Using no comparator");
+      return createEs(max, name, new LinkedBlockingQueue<>());
+    } else {
+      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
+          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
+      if (null != comparatorObj) {
+        log.debug("Using priority based scheduler {}", comparatorClazz);
+        return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj));
+      } else {
+        log.debug("Using no comparator");
+        return createEs(max, name, new LinkedBlockingQueue<>());
+      }
+    }
+  }
+
   private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
     int maxThreads = conf.getSystemConfiguration().getCount(max);
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
@@ -251,7 +285,8 @@ public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
+    readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
+        Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
     defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
         "metadata tablets read ahead");
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
new file mode 100644
index 0000000000..d584242fab
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class DefaultSessionComparator extends SessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+
+    final long startTimeFirst = sessionA.startTime;
+    final long startTimeSecond = sessionB.startTime;
+
+    // use the lowest max idle time
+    final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime
+        ? sessionA.maxIdleAccessTime
+        : sessionB.maxIdleAccessTime;
+
+    final long currentTime = System.currentTimeMillis();
+
+    /*
+     * Multiply by -1 so that we have a sensical comparison. This means that if comparison < 0,
+     * sessionA is newer. If comparison > 0, this means that session B is newer
+     */
+    int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
+
+    if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
+      if (comparison >= 0) {
+        long idleTimeA = currentTime - sessionA.lastExecTime;
+
+        /*
+         * If session B is newer, let's make sure that we haven't reached the max idle time, where
+         * we have to begin aging A
+         */
+        if (idleTimeA > sessionA.maxIdleAccessTime) {
+          comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
+        }
+      } else {
+        long idleTimeB = currentTime - sessionB.lastExecTime;
+
+        /*
+         * If session A is newer, let's make sure that B hasn't reached the max idle time, where we
+         * have to begin aging A
+         */
+        if (idleTimeB > sessionA.maxIdleAccessTime) {
+          comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
+        }
+      }
+    }
+
+    return comparison;
+  }
+
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 4285a51446..981832af6a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -71,4 +71,14 @@ public boolean cleanup() {
     // the cancellation should provide us the safety to return true here
     return true;
   }
+
+  /**
+   * Ensure that the runnable actually runs
+   */
+  @Override
+  public void run() {
+    super.run();
+    lookupTask.run();
+  }
+
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index eed45cf073..32f7e34851 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,15 +19,17 @@
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
-public class Session {
+public abstract class Session implements Runnable {
 
   enum State {
     NEW, UNRESERVED, RESERVED, REMOVED
   }
 
   public final String client;
-  long lastAccessTime;
+  public long lastAccessTime;
+  protected volatile long lastExecTime = -1;
   public long startTime;
+  public long maxIdleAccessTime;
   State state = State.NEW;
   private final TCredentials credentials;
 
@@ -47,4 +49,18 @@ public TCredentials getCredentials() {
   public boolean cleanup() {
     return true;
   }
+
+  @Override
+  public void run() {
+    lastExecTime = System.currentTimeMillis();
+  }
+
+  public void setLastExecutionTime(long lastExecTime) {
+    this.lastExecTime = lastExecTime;
+  }
+
+  public long getLastExecutionTime() {
+    return lastExecTime;
+  }
+
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
new file mode 100644
index 0000000000..dcfa1d4bbd
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.Comparator;
+
+public abstract class SessionComparator implements Comparator<Runnable> {
+
+  @Override
+  public int compare(Runnable sessionA, Runnable sessionB) {
+    if (sessionA instanceof Session && sessionB instanceof Session)
+      return compareSession((Session) sessionA, (Session) sessionB);
+    else
+      return 0;
+  }
+
+  public abstract int compareSession(final Session sessionA, final Session sessionB);
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 724d23c18f..c48569824d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,6 +231,9 @@ private void sweep(final long maxIdle, final long maxUpdateIdle) {
             configuredIdle = maxUpdateIdle;
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > session.maxIdleAccessTime) {
+            session.maxIdleAccessTime = idleTime;
+          }
           if (idleTime > configuredIdle) {
             log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
                 session.client, idleTime);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
new file mode 100644
index 0000000000..28e6ef22bb
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class SingleRangePriorityComparator extends DefaultSessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+    int priority = super.compareSession(sessionA, sessionB);
+
+    if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession) {
+      if (priority < 0) {
+        priority *= -1;
+      }
+    } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession) {
+      if (priority > 0) {
+        priority *= -1;
+      }
+    }
+    return priority;
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 9ad246cc3a..e8899ab7e6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1537,7 +1537,14 @@ public synchronized void initiateMajorCompaction(MajorCompactionReason reason) {
 
     majorCompactionQueued.add(reason);
 
-    getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
+    try {
+      getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
+    } catch (RuntimeException t) {
+      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner",
+          reason, t);
+      majorCompactionQueued.remove(reason);
+      throw t;
+    }
   }
 
   /**
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
new file mode 100644
index 0000000000..40d85d760c
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class SessionComparatorTest {
+
+  @Test
+  public void testSingleScanMultiScanNoRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.maxIdleAccessTime = 0;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    ScanSession sessionC = emptyScanSession();
+    sessionC.lastAccessTime = 0;
+    sessionC.maxIdleAccessTime = 1000;
+    sessionC.startTime = time - 800;
+
+    // a has never run, so it should be given priority
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertEquals(1, comparator.compareSession(sessionB, sessionA));
+
+    // now let's assume they have been executed
+
+    assertEquals(1, comparator.compareSession(sessionA, sessionC));
+
+    assertEquals(0, comparator.compareSession(sessionC, sessionC));
+
+  }
+
+  @Test
+  public void testSingleScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  @Test
+  public void testSingleScanMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp > 0);
+  }
+
+  @Test
+  public void testMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  private static ScanSession emptyScanSession() {
+    return new ScanSession(null, null, null, null, null, null, 0, 0, null);
+  }
+
+  private static MultiScanSession emptyMultiScanSession() {
+    return new MultiScanSession(null, null, null, null, null, null, null, 0, null);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services