You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/30 02:47:08 UTC
[3/9] git commit: ACCUMULO-3030 allow scanners to be interrupted
ACCUMULO-3030 allow scanners to be interrupted
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d8feff81
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d8feff81
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d8feff81
Branch: refs/heads/master
Commit: d8feff81e017ae08f1cbfcc9790c3490e7b10268
Parents: 2579d51
Author: Eric C. Newton <er...@gmail.com>
Authored: Tue Aug 5 07:57:34 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Sep 29 18:28:17 2014 -0400
----------------------------------------------------------------------
.../core/client/impl/ThriftScanner.java | 16 ++--
.../apache/accumulo/test/Accumulo3030IT.java | 83 ++++++++++++++++++++
2 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8feff81/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index ccee661..245194b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -56,7 +56,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.trace.instrument.Span;
import org.apache.accumulo.trace.instrument.Trace;
import org.apache.accumulo.trace.instrument.Tracer;
@@ -202,6 +201,9 @@ public class ThriftScanner {
Span span = Trace.start("scan");
try {
while (results == null && !scanState.finished) {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new AccumuloException("Thread interrupted");
+ }
if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut)
throw new ScanTimedOutException();
@@ -226,7 +228,7 @@ public class ThriftScanner {
else if (log.isTraceEnabled())
log.trace(error);
lastError = error;
- UtilWaitThread.sleep(100);
+ Thread.sleep(100);
} else {
// when a tablet splits we do want to continue scanning the low child
// of the split if we are already passed it
@@ -254,7 +256,7 @@ public class ThriftScanner {
log.trace(error);
lastError = error;
- UtilWaitThread.sleep(100);
+ Thread.sleep(100);
} finally {
locateSpan.stop();
}
@@ -288,7 +290,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- UtilWaitThread.sleep(100);
+ Thread.sleep(100);
} catch (NoSuchScanIDException e) {
error = "Scan failed, no such scan id " + scanState.scanID + " " + loc;
if (!error.equals(lastError))
@@ -323,7 +325,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- UtilWaitThread.sleep(100);
+ Thread.sleep(100);
} catch (TException e) {
TabletLocator.getInstance(instance, scanState.tableName).invalidateCache(loc.tablet_location);
error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage() + " " + loc;
@@ -341,7 +343,7 @@ public class ThriftScanner {
if (scanState.isolated)
throw new IsolationException();
- UtilWaitThread.sleep(100);
+ Thread.sleep(100);
} finally {
scanLocation.stop();
}
@@ -352,6 +354,8 @@ public class ThriftScanner {
}
return results;
+ } catch (InterruptedException ex) {
+ throw new AccumuloException(ex);
} finally {
span.stop();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8feff81/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
new file mode 100644
index 0000000..bc56346
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Accumulo3030IT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void test() throws Exception {
+ // make a table
+ final String tableName = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+ // make the world's slowest scanner
+ final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
+ SlowIterator.setSeekSleepTime(cfg, 99999*1000);
+ scanner.addScanIterator(cfg);
+ // create a thread to interrupt the slow scan
+ final Thread scanThread = Thread.currentThread();
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
+ String tserver = conn.instanceOperations().getTabletServers().iterator().next();
+ while (conn.instanceOperations().getActiveScans(tserver).size() < 1) {
+ UtilWaitThread.sleep(1000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // BAM!
+ scanThread.interrupt();
+ }
+ };
+ thread.start();
+ try {
+ // Use the scanner, expect problems
+ for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+ }
+ Assert.fail("Scan should not succeed");
+ } catch (Exception ex) {
+ } finally {
+ thread.join();
+ }
+ }
+
+}