You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/09/30 02:47:09 UTC

[4/9] git commit: ACCUMULO-3180 Better test naming and some extra functionality to SlowIterator

ACCUMULO-3180 Better test naming and some extra functionality to SlowIterator


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5459950d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5459950d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5459950d

Branch: refs/heads/1.5
Commit: 5459950dd70f7df01cf5dbebb8324e80cb034548
Parents: d8feff8
Author: Josh Elser <el...@apache.org>
Authored: Mon Sep 29 19:33:33 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon Sep 29 19:33:33 2014 -0400

----------------------------------------------------------------------
 .../accumulo/test/functional/SlowIterator.java  |  39 ++++--
 .../apache/accumulo/test/Accumulo3030IT.java    |  83 -------------
 .../test/AllowScansToBeInterruptedIT.java       | 123 +++++++++++++++++++
 3 files changed, 155 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index a426b7f..a9b254e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -17,9 +17,13 @@
 package org.apache.accumulo.test.functional;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -27,24 +31,45 @@ import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.util.UtilWaitThread;
 
 public class SlowIterator extends WrappingIterator {
-  
-  long sleepTime;
-  
+
+  static private final String SLEEP_TIME = "sleepTime";
+  static private final String SEEK_SLEEP_TIME = "seekSleepTime";
+  private long sleepTime = 0;
+  private long seekSleepTime = 0;
+
+  public static void setSleepTime(IteratorSetting is, long millis) {
+    is.addOption(SLEEP_TIME, Long.toString(millis));
+  }
+
+  public static void setSeekSleepTime(IteratorSetting is, long t) {
+    is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
+  }
+
   @Override
   public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     throw new UnsupportedOperationException();
   }
-  
+
   @Override
   public void next() throws IOException {
     UtilWaitThread.sleep(sleepTime);
     super.next();
   }
-  
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    UtilWaitThread.sleep(seekSleepTime);
+    super.seek(range, columnFamilies, inclusive);
+  }
+
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
     super.init(source, options, env);
-    sleepTime = Long.parseLong(options.get("sleepTime"));
+    if (options.containsKey(SLEEP_TIME))
+      sleepTime = Long.parseLong(options.get(SLEEP_TIME));
+
+    if (options.containsKey(SEEK_SLEEP_TIME))
+      seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
deleted file mode 100644
index bc56346..0000000
--- a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.test.functional.SlowIterator;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class Accumulo3030IT extends ConfigurableMacIT {
-  
-  @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setNumTservers(1);
-  }
-
-  @Test(timeout = 60 * 1000)
-  public void test() throws Exception {
-    // make a table
-    final String tableName = getUniqueNames(1)[0];
-    final Connector conn = getConnector();
-    conn.tableOperations().create(tableName);
-    // make the world's slowest scanner
-    final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
-    final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
-    SlowIterator.setSeekSleepTime(cfg, 99999*1000);
-    scanner.addScanIterator(cfg);
-    // create a thread to interrupt the slow scan
-    final Thread scanThread = Thread.currentThread();
-    Thread thread = new Thread() {
-      @Override
-      public void run() {
-        try {
-          // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
-          String tserver = conn.instanceOperations().getTabletServers().iterator().next();
-          while (conn.instanceOperations().getActiveScans(tserver).size() < 1) {
-            UtilWaitThread.sleep(1000);
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-        // BAM!
-        scanThread.interrupt();
-      }
-    };
-    thread.start();
-    try {
-      // Use the scanner, expect problems
-      for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
-      }
-      Assert.fail("Scan should not succeed");
-    } catch (Exception ex) {
-    } finally {
-      thread.join();
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java b/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java
new file mode 100644
index 0000000..bbef6ea
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class AllowScansToBeInterruptedIT {
+  private static final Logger log = Logger.getLogger(AllowScansToBeInterruptedIT.class);
+
+  public static TemporaryFolder folder = new TemporaryFolder();
+  private MiniAccumuloCluster accumulo;
+  private String secret = "secret";
+
+  @Before
+  public void setUp() throws Exception {
+    folder.create();
+    log.info("Using MAC at " + folder.getRoot());
+    MiniAccumuloConfig cfg = new MiniAccumuloConfig(folder.getRoot(), secret);
+    cfg.setNumTservers(1);
+    accumulo = new MiniAccumuloCluster(cfg);
+    accumulo.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    accumulo.stop();
+    folder.delete();
+  }
+
+  Connector getConnector() throws AccumuloException, AccumuloSecurityException {
+    ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
+    return zki.getConnector("root", new PasswordToken(secret));
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void test() throws Exception {
+    // make a table
+    final String tableName = "test";
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    // make the world's slowest scanner
+    final Scanner scanner = conn.createScanner(tableName, Constants.NO_AUTHS);
+    final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
+    SlowIterator.setSeekSleepTime(cfg, 99999*1000);
+    scanner.addScanIterator(cfg);
+    // create a thread to interrupt the slow scan
+    final Thread scanThread = Thread.currentThread();
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
+          String tserver = conn.instanceOperations().getTabletServers().iterator().next();
+          List<ActiveScan> scans = null;
+          while (null == scans) {
+            try {
+              // Sometimes getting errors the first time around
+              scans = conn.instanceOperations().getActiveScans(tserver);
+            } catch (Exception e) {
+              log.warn("Could not connect to tserver " + tserver, e);
+            }
+          }
+          while (scans.size() < 1) {
+            UtilWaitThread.sleep(1000);
+            scans = conn.instanceOperations().getActiveScans(tserver);
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+        // BAM!
+        scanThread.interrupt();
+      }
+    };
+    thread.start();
+    try {
+      // Use the scanner, expect problems
+      for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) {
+      }
+      Assert.fail("Scan should not succeed");
+    } catch (Exception ex) {
+    } finally {
+      thread.join();
+    }
+  }
+
+}