You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/08/15 19:12:28 UTC

[2/6] accumulo git commit: ACCUMULO-4405 Prevent ThriftScanner from waiting Long.MAX_VALUE millis

ACCUMULO-4405 Prevent ThriftScanner from waiting Long.MAX_VALUE millis


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e087bad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e087bad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e087bad

Branch: refs/heads/1.8
Commit: 1e087bad3681e0f1fc9d7d35a6ed753a0435a181
Parents: 4885470
Author: Josh Elser <el...@apache.org>
Authored: Mon Aug 15 13:04:45 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Aug 15 14:08:55 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ThriftScanner.java         | 16 ++++---
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 .../core/client/impl/ThriftScannerTest.java     | 45 ++++++++++++++++++++
 3 files changed, 56 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 39d3b32..d2fc259 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyValue;
@@ -196,10 +197,10 @@ public class ThriftScanner {
 
   }
 
-  private static long pause(long millis) throws InterruptedException {
+  static long pause(long millis, long maxSleep) throws InterruptedException {
     Thread.sleep(millis);
     // wait 2 * last time, with +-10% random jitter
-    return (long) (Math.max(millis * 2, 3000) * (.9 + Math.random() / 5));
+    return (long) (Math.min(millis * 2, maxSleep) * (.9 + Math.random() / 5));
   }
 
   public static List<KeyValue> scan(ClientContext context, ScanState scanState, int timeOut) throws ScanTimedOutException, AccumuloException,
@@ -211,6 +212,7 @@ public class ThriftScanner {
     String error = null;
     int tooManyFilesCount = 0;
     long sleepMillis = 100;
+    final long maxSleepTime = context.getConfiguration().getTimeInMillis(Property.GENERAL_MAX_SCANNER_RETRY_PERIOD);
 
     List<KeyValue> results = null;
 
@@ -245,7 +247,7 @@ public class ThriftScanner {
               else if (log.isTraceEnabled())
                 log.trace(error);
               lastError = error;
-              sleepMillis = pause(sleepMillis);
+              sleepMillis = pause(sleepMillis, maxSleepTime);
             } else {
               // when a tablet splits we do want to continue scanning the low child
               // of the split if we are already passed it
@@ -273,7 +275,7 @@ public class ThriftScanner {
               log.trace(error);
 
             lastError = error;
-            sleepMillis = pause(sleepMillis);
+            sleepMillis = pause(sleepMillis, maxSleepTime);
           } finally {
             locateSpan.stop();
           }
@@ -308,7 +310,7 @@ public class ThriftScanner {
           if (scanState.isolated)
             throw new IsolationException();
 
-          sleepMillis = pause(sleepMillis);
+          sleepMillis = pause(sleepMillis, maxSleepTime);
         } catch (NoSuchScanIDException e) {
           error = "Scan failed, no such scan id " + scanState.scanID + " " + loc;
           if (!error.equals(lastError))
@@ -343,7 +345,7 @@ public class ThriftScanner {
           if (scanState.isolated)
             throw new IsolationException();
 
-          sleepMillis = pause(sleepMillis);
+          sleepMillis = pause(sleepMillis, maxSleepTime);
         } catch (TException e) {
           TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context.getInstance(), loc.tablet_location);
           error = "Scan failed, thrift error " + e.getClass().getName() + "  " + e.getMessage() + " " + loc;
@@ -361,7 +363,7 @@ public class ThriftScanner {
           if (scanState.isolated)
             throw new IsolationException();
 
-          sleepMillis = pause(sleepMillis);
+          sleepMillis = pause(sleepMillis, maxSleepTime);
         } finally {
           scanLocation.stop();
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c427610..8b51a91 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -199,6 +199,8 @@ public enum Property {
       "The length of time that delegation tokens and secret keys are valid"),
   GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d", PropertyType.TIMEDURATION,
       "The length of time between generation of new secret keys"),
+  GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s", PropertyType.TIMEDURATION,
+      "The maximum amount of time that a Scanner should wait before retrying a failed RPC"),
 
   // properties that are specific to master server behavior
   MASTER_PREFIX("master.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the master server"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
new file mode 100644
index 0000000..a60ea3c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Test calss for {@link ThriftScanner}.
+ */
+public class ThriftScannerTest {
+
+  private static boolean withinTenPercent(long expected, long actual) {
+    long delta = Math.max(expected / 10, 1);
+    return actual >= (expected - delta) && actual <= (expected + delta);
+  }
+
+  @Test
+  public void testPauseIncrease() throws Exception {
+    long newPause = ThriftScanner.pause(5L, 5000L);
+    assertTrue("New pause should be within [9,11], but was " + newPause, withinTenPercent(10L, newPause));
+  }
+
+  @Test
+  public void testMaxPause() throws Exception {
+    long maxPause = 1L;
+    long nextPause = ThriftScanner.pause(5L, maxPause);
+    assertTrue("New pause should be within [0,2], but was " + nextPause, withinTenPercent(maxPause, nextPause));
+  }
+}