You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/08/15 19:12:28 UTC
[2/6] accumulo git commit: ACCUMULO-4405 Prevent ThriftScanner from
waiting Long.MAX_VALUE millis
ACCUMULO-4405 Prevent ThriftScanner from waiting Long.MAX_VALUE millis
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e087bad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e087bad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e087bad
Branch: refs/heads/1.8
Commit: 1e087bad3681e0f1fc9d7d35a6ed753a0435a181
Parents: 4885470
Author: Josh Elser <el...@apache.org>
Authored: Mon Aug 15 13:04:45 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Aug 15 14:08:55 2016 -0400
----------------------------------------------------------------------
.../core/client/impl/ThriftScanner.java | 16 ++++---
.../org/apache/accumulo/core/conf/Property.java | 2 +
.../core/client/impl/ThriftScannerTest.java | 45 ++++++++++++++++++++
3 files changed, 56 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 39d3b32..d2fc259 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
@@ -196,10 +197,10 @@ public class ThriftScanner {
}
- private static long pause(long millis) throws InterruptedException {
+ static long pause(long millis, long maxSleep) throws InterruptedException {
Thread.sleep(millis);
// wait 2 * last time, with +-10% random jitter
- return (long) (Math.max(millis * 2, 3000) * (.9 + Math.random() / 5));
+ return (long) (Math.min(millis * 2, maxSleep) * (.9 + Math.random() / 5));
}
public static List<KeyValue> scan(ClientContext context, ScanState scanState, int timeOut) throws ScanTimedOutException, AccumuloException,
@@ -211,6 +212,7 @@ public class ThriftScanner {
String error = null;
int tooManyFilesCount = 0;
long sleepMillis = 100;
+ final long maxSleepTime = context.getConfiguration().getTimeInMillis(Property.GENERAL_MAX_SCANNER_RETRY_PERIOD);
List<KeyValue> results = null;
@@ -245,7 +247,7 @@ public class ThriftScanner {
else if (log.isTraceEnabled())
log.trace(error);
lastError = error;
- sleepMillis = pause(sleepMillis);
+ sleepMillis = pause(sleepMillis, maxSleepTime);
} else {
// when a tablet splits we do want to continue scanning the low child
// of the split if we are already passed it
@@ -273,7 +275,7 @@ public class ThriftScanner {
log.trace(error);
lastError = error;
- sleepMillis = pause(sleepMillis);
+ sleepMillis = pause(sleepMillis, maxSleepTime);
} finally {
locateSpan.stop();
}
@@ -308,7 +310,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- sleepMillis = pause(sleepMillis);
+ sleepMillis = pause(sleepMillis, maxSleepTime);
} catch (NoSuchScanIDException e) {
error = "Scan failed, no such scan id " + scanState.scanID + " " + loc;
if (!error.equals(lastError))
@@ -343,7 +345,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- sleepMillis = pause(sleepMillis);
+ sleepMillis = pause(sleepMillis, maxSleepTime);
} catch (TException e) {
TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context.getInstance(), loc.tablet_location);
error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage() + " " + loc;
@@ -361,7 +363,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- sleepMillis = pause(sleepMillis);
+ sleepMillis = pause(sleepMillis, maxSleepTime);
} finally {
scanLocation.stop();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c427610..8b51a91 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -199,6 +199,8 @@ public enum Property {
"The length of time that delegation tokens and secret keys are valid"),
GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL("general.delegation.token.update.interval", "1d", PropertyType.TIMEDURATION,
"The length of time between generation of new secret keys"),
+ GENERAL_MAX_SCANNER_RETRY_PERIOD("general.max.scanner.retry.period", "5s", PropertyType.TIMEDURATION,
+ "The maximum amount of time that a Scanner should wait before retrying a failed RPC"),
// properties that are specific to master server behavior
MASTER_PREFIX("master.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the master server"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e087bad/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
new file mode 100644
index 0000000..a60ea3c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftScannerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Test calss for {@link ThriftScanner}.
+ */
+public class ThriftScannerTest {
+
+ private static boolean withinTenPercent(long expected, long actual) {
+ long delta = Math.max(expected / 10, 1);
+ return actual >= (expected - delta) && actual <= (expected + delta);
+ }
+
+ @Test
+ public void testPauseIncrease() throws Exception {
+ long newPause = ThriftScanner.pause(5L, 5000L);
+ assertTrue("New pause should be within [9,11], but was " + newPause, withinTenPercent(10L, newPause));
+ }
+
+ @Test
+ public void testMaxPause() throws Exception {
+ long maxPause = 1L;
+ long nextPause = ThriftScanner.pause(5L, maxPause);
+ assertTrue("New pause should be within [0,2], but was " + nextPause, withinTenPercent(maxPause, nextPause));
+ }
+}