You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/10/24 22:20:50 UTC

[1/7] git commit: ACCUMULO-3257 Use a threadpool to ensure that a Node doesn't exceed maxSec.

Repository: accumulo
Updated Branches:
  refs/heads/1.6 e3a743cb4 -> 1635ad5c7
  refs/heads/master eaaebdf33 -> ebe6eecc7


ACCUMULO-3257 Use a threadpool to ensure that a Node doesn't exceed maxSec.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c4f3fc83
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c4f3fc83
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c4f3fc83

Branch: refs/heads/1.6
Commit: c4f3fc8317bd91aa02c71a405356545e1561d247
Parents: e3a743c
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 23 16:29:40 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:28 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/test/randomwalk/Module.java | 419 +++++++++++--------
 1 file changed, 241 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4f3fc83/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index c71d2d0..57b49ae 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@ -27,6 +27,12 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.xml.XMLConstants;
@@ -37,6 +43,7 @@ import javax.xml.validation.SchemaFactory;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.log4j.Level;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -46,15 +53,15 @@ import org.w3c.dom.NodeList;
  * A module is directed graph of tests
  */
 public class Module extends Node {
-  
+
   private class Dummy extends Node {
-    
+
     String name;
-    
+
     Dummy(String name) {
       this.name = name;
     }
-    
+
     @Override
     public void visit(State state, Properties props) {
       String print;
@@ -63,86 +70,88 @@ public class Module extends Node {
         log.log(level, name);
       }
     }
-    
+
+    @Override
     public String toString() {
       return name;
     }
   }
-  
+
   private class Alias extends Node {
-    
+
     Node target;
     String targetId;
     String id;
-    
+
     Alias(String id) {
       target = null;
       this.id = id;
     }
-    
+
     @Override
     public void visit(State state, Properties props) throws Exception {
       throw new Exception("You don't visit aliases!");
     }
-    
+
+    @Override
     public String toString() {
       return id;
     }
-    
+
     public void update(String node) throws Exception {
       targetId = node;
       target = getNode(node);
     }
-    
+
     public Node get() {
       return target;
     }
-    
+
     public String getTargetId() {
       return targetId;
     }
   }
-  
+
   private HashMap<String,Node> nodes = new HashMap<String,Node>();
   private HashMap<String,Properties> localProps = new HashMap<String,Properties>();
-  
+
   private class Edge {
     String nodeId;
     int weight;
   }
-  
+
   private class AdjList {
-    
+
     private List<Edge> edges = new ArrayList<Edge>();
     private int totalWeight = 0;
     private Random rand = new Random();
-    
+
     /**
      * Adds a neighbor node and weight of edge
      */
     private void addEdge(String nodeId, int weight) {
-      
+
       totalWeight += weight;
-      
+
       Edge e = new Edge();
       e.nodeId = nodeId;
       e.weight = weight;
       edges.add(e);
     }
-    
+
     /**
      * Chooses a random neighbor node
-     * 
+     *
      * @return Node or null if no edges
      */
     private String randomNeighbor() throws Exception {
-      
+
       String nodeId = null;
       rand = new Random();
-      
+
       int randNum = rand.nextInt(totalWeight) + 1;
       int sum = 0;
-      
+
       for (Edge e : edges) {
         nodeId = e.nodeId;
         sum += e.weight;
@@ -153,24 +162,24 @@ public class Module extends Node {
       return nodeId;
     }
   }
-  
+
   private HashMap<String,String> prefixes = new HashMap<String,String>();
   private HashMap<String,AdjList> adjMap = new HashMap<String,AdjList>();
   private HashMap<String,Set<String>> aliasMap = new HashMap<String,Set<String>>();
   private final File xmlFile;
   private String initNodeId;
   private Fixture fixture = null;
-  
+
   public Module(File xmlFile) throws Exception {
     this.xmlFile = xmlFile;
     loadFromXml();
   }
-  
+
   @Override
-  public void visit(State state, Properties props) throws Exception {
+  public void visit(final State state, Properties props) throws Exception {
     int maxHops, maxSec;
     boolean teardown;
-    
+
     Properties initProps = getProps("_init");
     initProps.putAll(props);
     String prop;
@@ -178,136 +187,190 @@ public class Module extends Node {
       maxHops = Integer.MAX_VALUE;
     else
       maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));
-    
+
     if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
       maxSec = Integer.MAX_VALUE;
     else
       maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));
-    
+
     if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true") || prop.equals(""))
       teardown = true;
     else
       teardown = false;
-    
+
     if (fixture != null) {
       fixture.setUp(state);
     }
-    
-    Node initNode = getNode(initNodeId);
-    
-    boolean test = false;
-    if (initNode instanceof Test) {
-      startTimer(initNode);
-      test = true;
-    }
-    initNode.visit(state, getProps(initNodeId));
-    if (test)
-      stopTimer(initNode);
-    
-    state.visitedNode();
-    // update aliases
-    Set<String> aliases;
-    if ((aliases = aliasMap.get(initNodeId)) != null)
-      for (String alias : aliases) {
-        ((Alias) nodes.get(alias)).update(initNodeId);
-      }
-    
-    String curNodeId = initNodeId;
-    int numHops = 0;
-    long startTime = System.currentTimeMillis() / 1000;
-    while (true) {
-      // check if END state was reached
-      if (curNodeId.equalsIgnoreCase("END")) {
-        log.debug("reached END state");
-        break;
-      }
-      // check if maxSec was reached
-      long curTime = System.currentTimeMillis() / 1000;
-      if ((curTime - startTime) > maxSec) {
-        log.debug("reached maxSec(" + maxSec + ")");
-        break;
-      }
-      // check if maxHops was reached
-      if (numHops > maxHops) {
-        log.debug("reached maxHops(" + maxHops + ")");
-        break;
-      }
-      numHops++;
-      
-      if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
-        throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
-      }
-      AdjList adj = adjMap.get(curNodeId);
-      String nextNodeId = adj.randomNeighbor();
-      Node nextNode = getNode(nextNodeId);
-      if (nextNode instanceof Alias) {
-        nextNodeId = ((Alias) nextNode).getTargetId();
-        nextNode = ((Alias) nextNode).get();
+
+    ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
+
+    try {
+      Node initNode = getNode(initNodeId);
+
+      boolean test = false;
+      if (initNode instanceof Test) {
+        startTimer(initNode);
+        test = true;
       }
-      Properties nodeProps = getProps(nextNodeId);
-      try {
-        test = false;
-        if (nextNode instanceof Test) {
-          startTimer(nextNode);
-          test = true;
+      initNode.visit(state, getProps(initNodeId));
+      if (test)
+        stopTimer(initNode);
+
+      state.visitedNode();
+      // update aliases
+      Set<String> aliases;
+      if ((aliases = aliasMap.get(initNodeId)) != null)
+        for (String alias : aliases) {
+          ((Alias) nodes.get(alias)).update(initNodeId);
         }
-        nextNode.visit(state, nodeProps);
-        if (test)
-          stopTimer(nextNode);
-      } catch (Exception e) {
-        log.debug("Connector belongs to user: " + state.getConnector().whoami());
-        log.debug("Exception occured at: " + System.currentTimeMillis());
-        log.debug("Properties for node: " + nextNodeId);
-        for (Entry<Object,Object> entry : nodeProps.entrySet()) {
-          log.debug("  " + entry.getKey() + ": " + entry.getValue());
+
+      String curNodeId = initNodeId;
+      int numHops = 0;
+      long startTime = System.currentTimeMillis() / 1000;
+      while (true) {
+        // check if END state was reached
+        if (curNodeId.equalsIgnoreCase("END")) {
+          log.debug("reached END state");
+          break;
         }
-        log.debug("Overall Properties");
-        for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
-          log.debug("  " + entry.getKey() + ": " + entry.getValue());
+        // check if maxSec was reached
+        long curTime = System.currentTimeMillis() / 1000;
+        if ((curTime - startTime) > maxSec) {
+          log.debug("reached maxSec(" + maxSec + ")");
+          break;
         }
-        log.debug("State information");
-        for (String key : new TreeSet<String>(state.getMap().keySet()))  {
-          Object value = state.getMap().get(key);
-          String logMsg = "  " + key + ": ";
-          if (value == null)
-            logMsg += "null";
-          else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
-            logMsg += value;
-          else if (value instanceof byte[])
-            logMsg += new String((byte[])value, Constants.UTF8);
-          else if (value instanceof PasswordToken)
-            logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8);
-          else
-            logMsg += value.getClass()+ " - " + value;
-          
-          log.debug(logMsg);
+
+        // The number of seconds before the test should exit
+        long secondsRemaining = maxSec - (curTime - startTime);
+
+        // check if maxHops was reached
+        if (numHops > maxHops) {
+          log.debug("reached maxHops(" + maxHops + ")");
+          break;
         }
-        throw new Exception("Error running node " + nextNodeId, e);
-      }
-      state.visitedNode();
-      
-      // update aliases
-      if ((aliases = aliasMap.get(curNodeId)) != null)
-        for (String alias : aliases) {
-          ((Alias) nodes.get(alias)).update(curNodeId);
+        numHops++;
+
+        if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
+          throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
+        }
+        AdjList adj = adjMap.get(curNodeId);
+        String nextNodeId = adj.randomNeighbor();
+        final Node nextNode;
+        Node nextNodeOrAlias = getNode(nextNodeId);
+        if (nextNodeOrAlias instanceof Alias) {
+          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
+          nextNode = ((Alias) nextNodeOrAlias).get();
+        } else {
+          nextNode = nextNodeOrAlias;
+        }
+        final Properties nodeProps = getProps(nextNodeId);
+        try {
+          test = false;
+          if (nextNode instanceof Test) {
+            startTimer(nextNode);
+            test = true;
+          }
+
+          // Wrap the visit of the next node in the module in a callable that returns a thrown exception
+          FutureTask<Exception> task = new FutureTask<Exception>(new Callable<Exception>() {
+
+            @Override
+            public Exception call() throws Exception {
+              try {
+                nextNode.visit(state, nodeProps);
+                return null;
+              } catch (Exception e) {
+                return e;
+              }
+            }
+
+          });
+
+          // Run the task (should execute immediately)
+          service.submit(task);
+
+          Exception nodeException;
+          try {
+            // Bound the time we'll wait for the node to complete
+            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            log.warn("Interrupted waiting for RandomWalk node to complete. Exiting.", e);
+            break;
+          } catch (ExecutionException e) {
+            log.error("Caught error executing RandomWalk node", e);
+            throw e;
+          } catch (TimeoutException e) {
+            log.info("Timed out waiting for RandomWalk node to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+            break;
+          }
+
+          // The RandomWalk node throw an Exception that that Callable handed back
+          // Throw it and let the Module perform cleanup
+          if (null != nodeException) {
+            throw nodeException;
+          }
+
+          if (test)
+            stopTimer(nextNode);
+        } catch (Exception e) {
+          log.debug("Connector belongs to user: " + state.getConnector().whoami());
+          log.debug("Exception occured at: " + System.currentTimeMillis());
+          log.debug("Properties for node: " + nextNodeId);
+          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("Overall Properties");
+          for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("State information");
+          for (String key : new TreeSet<String>(state.getMap().keySet())) {
+            Object value = state.getMap().get(key);
+            String logMsg = "  " + key + ": ";
+            if (value == null)
+              logMsg += "null";
+            else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
+              logMsg += value;
+            else if (value instanceof byte[])
+              logMsg += new String((byte[]) value, Constants.UTF8);
+            else if (value instanceof PasswordToken)
+              logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8);
+            else
+              logMsg += value.getClass() + " - " + value;
+
+            log.debug(logMsg);
+          }
+          throw new Exception("Error running node " + nextNodeId, e);
         }
-      
-      curNodeId = nextNodeId;
+        state.visitedNode();
+
+        // update aliases
+        if ((aliases = aliasMap.get(curNodeId)) != null)
+          for (String alias : aliases) {
+            ((Alias) nodes.get(alias)).update(curNodeId);
+          }
+
+        curNodeId = nextNodeId;
+      }
+    } finally {
+      if (null != service) {
+        service.shutdownNow();
+      }
     }
-    
+
     if (teardown && (fixture != null)) {
       log.debug("tearing down module");
       fixture.tearDown(state);
     }
   }
-  
+
   Thread timer = null;
   final int time = 5 * 1000 * 60;
   AtomicBoolean runningLong = new AtomicBoolean(false);
   long systemTime;
-  
+
   /**
-   * 
+   *
    */
   private void startTimer(final Node initNode) {
     runningLong.set(false);
@@ -330,9 +393,9 @@ public class Module extends Node {
     initNode.makingProgress();
     timer.start();
   }
-  
+
   /**
-   * 
+   *
    */
   private void stopTimer(Node nextNode) {
     synchronized (timer) {
@@ -346,31 +409,31 @@ public class Module extends Node {
     if (runningLong.get())
       log.warn("Node " + nextNode + ", which was running long, has now completed after " + (System.currentTimeMillis() - systemTime) / 1000.0 + " seconds");
   }
-  
+
   @Override
   public String toString() {
     return xmlFile.toString();
   }
-  
+
   private String getFullName(String name) {
-    
+
     int index = name.indexOf(".");
     if (index == -1 || name.endsWith(".xml")) {
       return name;
     }
-    
+
     String id = name.substring(0, index);
-    
+
     if (!prefixes.containsKey(id)) {
       log.warn("Id (" + id + ") was not found in prefixes");
       return name;
     }
-    
+
     return prefixes.get(id).concat(name.substring(index + 1));
   }
-  
+
   private Node createNode(String id, String src) throws Exception {
-    
+
     // check if id indicates dummy node
     if (id.equalsIgnoreCase("END") || id.startsWith("dummy")) {
       if (nodes.containsKey(id) == false) {
@@ -378,14 +441,14 @@ public class Module extends Node {
       }
       return nodes.get(id);
     }
-    
+
     if (id.startsWith("alias")) {
       if (nodes.containsKey(id) == false) {
         nodes.put(id, new Alias(id));
       }
       return nodes.get(id);
     }
-    
+
     // grab node from framework based on its id or src
     Node node;
     if (src == null || src.isEmpty()) {
@@ -393,44 +456,44 @@ public class Module extends Node {
     } else {
       node = Framework.getInstance().getNode(getFullName(src));
     }
-    
+
     // add to node to this module's map
     nodes.put(id, node);
-    
+
     return node;
   }
-  
+
   private Node getNode(String id) throws Exception {
-    
+
     if (nodes.containsKey(id)) {
       return nodes.get(id);
     }
-    
+
     if (id.equalsIgnoreCase("END")) {
       nodes.put(id, new Dummy(id));
       return nodes.get(id);
     }
-    
+
     return Framework.getInstance().getNode(getFullName(id));
   }
-  
+
   private Properties getProps(String nodeId) {
     if (localProps.containsKey(nodeId)) {
       return localProps.get(nodeId);
     }
     return new Properties();
   }
-  
+
   private void loadFromXml() throws Exception {
     DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
     DocumentBuilder docbuilder;
     Document d = null;
-    
+
     // set the schema
     SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
     Schema moduleSchema = sf.newSchema(this.getClass().getClassLoader().getResource("randomwalk/module.xsd"));
     dbf.setSchema(moduleSchema);
-    
+
     // parse the document
     try {
       docbuilder = dbf.newDocumentBuilder();
@@ -439,7 +502,7 @@ public class Module extends Node {
       log.error("Failed to parse: " + xmlFile, e);
       throw new Exception("Failed to parse: " + xmlFile);
     }
-    
+
     // parse packages
     NodeList nodelist = d.getDocumentElement().getElementsByTagName("package");
     for (int i = 0; i < nodelist.getLength(); i++) {
@@ -450,91 +513,91 @@ public class Module extends Node {
       }
       prefixes.put(el.getAttribute("prefix"), value);
     }
-    
+
     // parse fixture node
     nodelist = d.getDocumentElement().getElementsByTagName("fixture");
     if (nodelist.getLength() > 0) {
       Element fixtureEl = (Element) nodelist.item(0);
       fixture = (Fixture) Class.forName(getFullName(fixtureEl.getAttribute("id"))).newInstance();
     }
-    
+
     // parse initial node
     Element initEl = (Element) d.getDocumentElement().getElementsByTagName("init").item(0);
     initNodeId = initEl.getAttribute("id");
     Properties initProps = new Properties();
     String attr = initEl.getAttribute("maxHops");
-    
+
     if (attr != null)
       initProps.setProperty("maxHops", attr);
     attr = initEl.getAttribute("maxSec");
-    
+
     if (attr != null)
       initProps.setProperty("maxSec", attr);
     attr = initEl.getAttribute("teardown");
-    
+
     if (attr != null)
       initProps.setProperty("teardown", attr);
     localProps.put("_init", initProps);
-    
+
     // parse all nodes
     nodelist = d.getDocumentElement().getElementsByTagName("node");
     for (int i = 0; i < nodelist.getLength(); i++) {
-      
+
       Element nodeEl = (Element) nodelist.item(i);
-      
+
       // get attributes
       String id = nodeEl.getAttribute("id");
       if (adjMap.containsKey(id)) {
         throw new Exception("Module already contains: " + id);
       }
       String src = nodeEl.getAttribute("src");
-      
+
       // create node
       createNode(id, src);
-      
+
       // set some attributes in properties for later use
       Properties props = new Properties();
       props.setProperty("maxHops", nodeEl.getAttribute("maxHops"));
       props.setProperty("maxSec", nodeEl.getAttribute("maxSec"));
       props.setProperty("teardown", nodeEl.getAttribute("teardown"));
-      
+
       // parse aliases
       NodeList aliaslist = nodeEl.getElementsByTagName("alias");
       Set<String> aliases = new TreeSet<String>();
       for (int j = 0; j < aliaslist.getLength(); j++) {
         Element propEl = (Element) aliaslist.item(j);
-        
+
         if (!propEl.hasAttribute("name")) {
           throw new Exception("Node " + id + " has alias with no identifying name");
         }
-        
+
         String key = "alias." + propEl.getAttribute("name");
-        
+
         aliases.add(key);
         createNode(key, null);
       }
       if (aliases.size() > 0)
         aliasMap.put(id, aliases);
-      
+
       // parse properties of nodes
       NodeList proplist = nodeEl.getElementsByTagName("property");
       for (int j = 0; j < proplist.getLength(); j++) {
         Element propEl = (Element) proplist.item(j);
-        
+
         if (!propEl.hasAttribute("key") || !propEl.hasAttribute("value")) {
           throw new Exception("Node " + id + " has property with no key or value");
         }
-        
+
         String key = propEl.getAttribute("key");
-        
+
         if (key.equals("maxHops") || key.equals("maxSec") || key.equals("teardown")) {
           throw new Exception("The following property can only be set in attributes: " + key);
         }
-        
+
         props.setProperty(key, propEl.getAttribute("value"));
       }
       localProps.put(id, props);
-      
+
       // parse edges of nodes
       AdjList edges = new AdjList();
       adjMap.put(id, edges);
@@ -544,13 +607,13 @@ public class Module extends Node {
       }
       for (int j = 0; j < edgelist.getLength(); j++) {
         Element edgeEl = (Element) edgelist.item(j);
-        
+
         String edgeID = edgeEl.getAttribute("id");
-        
+
         if (!edgeEl.hasAttribute("weight")) {
           throw new Exception("Edge with id=" + edgeID + " is missing weight");
         }
-        
+
         int weight = Integer.parseInt(edgeEl.getAttribute("weight"));
         edges.addEdge(edgeID, weight);
       }


[7/7] git commit: Merge branch '1.6'

Posted by el...@apache.org.
Merge branch '1.6'

Conflicts:
	test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
	test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
	test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ebe6eecc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ebe6eecc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ebe6eecc

Branch: refs/heads/master
Commit: ebe6eecc749e4456c7e53e232d61d7f9d73f86ed
Parents: eaaebdf 1635ad5
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 24 16:20:27 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 16:20:27 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/test/randomwalk/Module.java | 418 +++++++++++--------
 .../test/randomwalk/bulk/BulkImportTest.java    |  85 ++++
 .../test/randomwalk/bulk/BulkMinusOne.java      |   8 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       |  20 +-
 .../accumulo/test/randomwalk/bulk/BulkTest.java |   8 +-
 .../accumulo/test/randomwalk/bulk/Compact.java  |   6 +-
 .../test/randomwalk/bulk/ConsistencyCheck.java  |   6 +-
 .../accumulo/test/randomwalk/bulk/Merge.java    |  12 +-
 .../test/randomwalk/bulk/SelectiveBulkTest.java |  42 ++
 .../test/randomwalk/bulk/SelectiveQueueing.java |  49 +++
 .../accumulo/test/randomwalk/bulk/Split.java    |   6 +-
 11 files changed, 450 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index adf9b04,5756934..353a02b
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@@ -36,9 -41,10 +42,10 @@@ import javax.xml.parsers.DocumentBuilde
  import javax.xml.validation.Schema;
  import javax.xml.validation.SchemaFactory;
  
 -import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+ import org.apache.accumulo.core.util.SimpleThreadPool;
  import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
  import org.w3c.dom.Document;
  import org.w3c.dom.Element;
  import org.w3c.dom.NodeList;
@@@ -47,19 -53,17 +54,20 @@@
   * A module is directed graph of tests
   */
  public class Module extends Node {
-   
+ 
 +  private static final Logger log = Logger.getLogger(Module.class);
-   
++
++
    private class Dummy extends Node {
-     
+ 
      String name;
-     
+ 
      Dummy(String name) {
        this.name = name;
      }
-     
+ 
      @Override
 -    public void visit(State state, Properties props) {
 +    public void visit(State state, Environment env, Properties props) {
        String print;
        if ((print = props.getProperty("print")) != null) {
          Level level = Level.toLevel(print);
@@@ -82,12 -87,13 +91,13 @@@
        target = null;
        this.id = id;
      }
-     
+ 
      @Override
 -    public void visit(State state, Properties props) throws Exception {
 +    public void visit(State state, Environment env, Properties props) throws Exception {
        throw new Exception("You don't visit aliases!");
      }
-     
+ 
+     @Override
      public String toString() {
        return id;
      }
@@@ -168,12 -174,12 +178,12 @@@
      this.xmlFile = xmlFile;
      loadFromXml();
    }
-   
+ 
    @Override
-   public void visit(State state, Environment env, Properties props) throws Exception {
 -  public void visit(final State state, Properties props) throws Exception {
++  public void visit(final State state, final Environment env, Properties props) throws Exception {
      int maxHops, maxSec;
      boolean teardown;
-     
+ 
      Properties initProps = getProps("_init");
      initProps.putAll(props);
      String prop;
@@@ -191,117 -197,173 +201,171 @@@
        teardown = true;
      else
        teardown = false;
-     
+ 
      if (fixture != null) {
 -      fixture.setUp(state);
 +      fixture.setUp(state, env);
      }
-     
-     Node initNode = getNode(initNodeId);
-     
-     boolean test = false;
-     if (initNode instanceof Test) {
-       startTimer(initNode);
-       test = true;
-     }
-     initNode.visit(state, env, getProps(initNodeId));
-     if (test)
-       stopTimer(initNode);
-     
-     // update aliases
-     Set<String> aliases;
-     if ((aliases = aliasMap.get(initNodeId)) != null)
-       for (String alias : aliases) {
-         ((Alias) nodes.get(alias)).update(initNodeId);
-       }
-     
-     String curNodeId = initNodeId;
-     int numHops = 0;
-     long startTime = System.currentTimeMillis() / 1000;
-     while (true) {
-       // check if END state was reached
-       if (curNodeId.equalsIgnoreCase("END")) {
-         log.debug("reached END state");
-         break;
-       }
-       // check if maxSec was reached
-       long curTime = System.currentTimeMillis() / 1000;
-       if ((curTime - startTime) > maxSec) {
-         log.debug("reached maxSec(" + maxSec + ")");
-         break;
-       }
-       // check if maxHops was reached
-       if (numHops > maxHops) {
-         log.debug("reached maxHops(" + maxHops + ")");
-         break;
-       }
-       numHops++;
-       
-       if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
-         throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
-       }
-       AdjList adj = adjMap.get(curNodeId);
-       String nextNodeId = adj.randomNeighbor();
-       Node nextNode = getNode(nextNodeId);
-       if (nextNode instanceof Alias) {
-         nextNodeId = ((Alias) nextNode).getTargetId();
-         nextNode = ((Alias) nextNode).get();
+ 
+     ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
+ 
+     try {
+       Node initNode = getNode(initNodeId);
+ 
+       boolean test = false;
+       if (initNode instanceof Test) {
+         startTimer(initNode);
+         test = true;
        }
-       Properties nodeProps = getProps(nextNodeId);
-       try {
-         test = false;
-         if (nextNode instanceof Test) {
-           startTimer(nextNode);
-           test = true;
 -      initNode.visit(state, getProps(initNodeId));
++      initNode.visit(state, env, getProps(initNodeId));
+       if (test)
+         stopTimer(initNode);
+ 
 -      state.visitedNode();
+       // update aliases
+       Set<String> aliases;
+       if ((aliases = aliasMap.get(initNodeId)) != null)
+         for (String alias : aliases) {
+           ((Alias) nodes.get(alias)).update(initNodeId);
          }
-         nextNode.visit(state, env, nodeProps);
-         if (test)
-           stopTimer(nextNode);
-       } catch (Exception e) {
-         log.debug("Connector belongs to user: " + env.getConnector().whoami());
-         log.debug("Exception occured at: " + System.currentTimeMillis());
-         log.debug("Properties for node: " + nextNodeId);
-         for (Entry<Object,Object> entry : nodeProps.entrySet()) {
-           log.debug("  " + entry.getKey() + ": " + entry.getValue());
+ 
+       String curNodeId = initNodeId;
+       int numHops = 0;
+       long startTime = System.currentTimeMillis() / 1000;
+       while (true) {
+         // check if END state was reached
+         if (curNodeId.equalsIgnoreCase("END")) {
+           log.debug("reached END state");
+           break;
          }
-         log.debug("Overall Configuration Properties");
-         for (Entry<Object,Object> entry : env.copyConfigProperties().entrySet()) {
-           log.debug("  " + entry.getKey() + ": " + entry.getValue());
+         // check if maxSec was reached
+         long curTime = System.currentTimeMillis() / 1000;
+         if ((curTime - startTime) > maxSec) {
+           log.debug("reached maxSec(" + maxSec + ")");
+           break;
          }
-         log.debug("State information");
-         for (String key : new TreeSet<String>(state.getMap().keySet()))  {
-           Object value = state.getMap().get(key);
-           String logMsg = "  " + key + ": ";
-           if (value == null)
-             logMsg += "null";
-           else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
-             logMsg += value;
-           else if (value instanceof byte[])
-             logMsg += new String((byte[])value, StandardCharsets.UTF_8);
-           else if (value instanceof PasswordToken)
-             logMsg += new String(((PasswordToken) value).getPassword(), StandardCharsets.UTF_8);
-           else
-             logMsg += value.getClass()+ " - " + value;
-           
-           log.debug(logMsg);
+ 
+         // The number of seconds before the test should exit
+         long secondsRemaining = maxSec - (curTime - startTime);
+ 
+         // check if maxHops was reached
+         if (numHops > maxHops) {
+           log.debug("reached maxHops(" + maxHops + ")");
+           break;
          }
-         throw new Exception("Error running node " + nextNodeId, e);
-       }
-       
-       // update aliases
-       if ((aliases = aliasMap.get(curNodeId)) != null)
-         for (String alias : aliases) {
-           ((Alias) nodes.get(alias)).update(curNodeId);
+         numHops++;
+ 
+         if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
+           throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
+         }
+         AdjList adj = adjMap.get(curNodeId);
+         String nextNodeId = adj.randomNeighbor();
+         final Node nextNode;
+         Node nextNodeOrAlias = getNode(nextNodeId);
+         if (nextNodeOrAlias instanceof Alias) {
+           nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
+           nextNode = ((Alias) nextNodeOrAlias).get();
+         } else {
+           nextNode = nextNodeOrAlias;
+         }
+         final Properties nodeProps = getProps(nextNodeId);
+         try {
+           test = false;
+           if (nextNode instanceof Test) {
+             startTimer(nextNode);
+             test = true;
+           }
+ 
+           // Wrap the visit of the next node in the module in a callable that returns a thrown exception
+           FutureTask<Exception> task = new FutureTask<Exception>(new Callable<Exception>() {
+ 
+             @Override
+             public Exception call() throws Exception {
+               try {
 -                nextNode.visit(state, nodeProps);
++                nextNode.visit(state, env, nodeProps);
+                 return null;
+               } catch (Exception e) {
+                 return e;
+               }
+             }
+ 
+           });
+ 
+           // Run the task (should execute immediately)
+           service.submit(task);
+ 
+           Exception nodeException;
+           try {
+             // Bound the time we'll wait for the node to complete
+             nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
+           } catch (InterruptedException e) {
+             log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
+             break;
+           } catch (ExecutionException e) {
+             log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
+             throw e;
+           } catch (TimeoutException e) {
+             log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+             break;
+           }
+ 
+           // The RandomWalk node throw an Exception that that Callable handed back
+           // Throw it and let the Module perform cleanup
+           if (null != nodeException) {
+             throw nodeException;
+           }
+ 
+           if (test)
+             stopTimer(nextNode);
+         } catch (Exception e) {
 -          log.debug("Connector belongs to user: " + state.getConnector().whoami());
++          log.debug("Connector belongs to user: " + env.getConnector().whoami());
+           log.debug("Exception occured at: " + System.currentTimeMillis());
+           log.debug("Properties for node: " + nextNodeId);
+           for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+             log.debug("  " + entry.getKey() + ": " + entry.getValue());
+           }
 -          log.debug("Overall Properties");
 -          for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
++          log.debug("Overall Configuration Properties");
++          for (Entry<Object,Object> entry : env.copyConfigProperties().entrySet()) {
+             log.debug("  " + entry.getKey() + ": " + entry.getValue());
+           }
+           log.debug("State information");
+           for (String key : new TreeSet<String>(state.getMap().keySet())) {
+             Object value = state.getMap().get(key);
+             String logMsg = "  " + key + ": ";
+             if (value == null)
+               logMsg += "null";
+             else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
+               logMsg += value;
+             else if (value instanceof byte[])
 -              logMsg += new String((byte[]) value, Constants.UTF8);
++              logMsg += new String((byte[]) value, StandardCharsets.UTF_8);
+             else if (value instanceof PasswordToken)
 -              logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8);
++              logMsg += new String(((PasswordToken) value).getPassword(), StandardCharsets.UTF_8);
+             else
+               logMsg += value.getClass() + " - " + value;
+ 
+             log.debug(logMsg);
+           }
+           throw new Exception("Error running node " + nextNodeId, e);
          }
-       
-       curNodeId = nextNodeId;
 -        state.visitedNode();
+ 
+         // update aliases
+         if ((aliases = aliasMap.get(curNodeId)) != null)
+           for (String alias : aliases) {
+             ((Alias) nodes.get(alias)).update(curNodeId);
+           }
+ 
+         curNodeId = nextNodeId;
+       }
+     } finally {
+       if (null != service) {
+         service.shutdownNow();
+       }
      }
-     
+ 
      if (teardown && (fixture != null)) {
        log.debug("tearing down module");
 -      fixture.tearDown(state);
 +      fixture.tearDown(state, env);
      }
    }
-   
+ 
    Thread timer = null;
    final int time = 5 * 1000 * 60;
    AtomicBoolean runningLong = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
index 0000000,aa44741..33d7701
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
@@@ -1,0 -1,84 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.Properties;
+ 
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ 
+ /**
+  * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports
+  * of MinusOne.
+  */
+ public abstract class BulkImportTest extends BulkTest {
+ 
+   public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+ 
+   @Override
 -  public void visit(final State state, Properties props) throws Exception {
++  public void visit(final State state, Environment env, Properties props) throws Exception {
+     /**
+      * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and
+      * making decisions about what to do before we submit something to the thread pool, we're fine.
+      */
+ 
+     String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+     // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne
+     // with skipping the new BulkMinusOne to make sure that we maintain consistency
+     if (null != lastImportSkipped) {
+       if (!getClass().equals(BulkMinusOne.class)) {
+         throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+             + getClass().getName());
+       }
+ 
+       if (TRUE.equals(lastImportSkipped)) {
+         log.debug("Last import was skipped, skipping this import to ensure consistency");
+         state.remove(SKIPPED_IMPORT);
+ 
+         // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+         log.debug("Waiting 30s before continuing");
+         try {
+           Thread.sleep(30 * 1000);
+         } catch (InterruptedException e) {}
+ 
+         return;
+       } else {
+         // last import was not skipped, remove the marker
+         state.remove(SKIPPED_IMPORT);
+       }
+     }
+ 
 -    if (shouldQueueMoreImports(state)) {
 -      super.visit(state, props);
++    if (shouldQueueMoreImports(state, env)) {
++      super.visit(state, env, props);
+     } else {
+       log.debug("Not queuing more imports this round because too many are already queued");
+       state.set(SKIPPED_IMPORT, TRUE);
+       // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne
+     }
+   }
+ 
 -  private boolean shouldQueueMoreImports(State state) throws Exception {
++  private boolean shouldQueueMoreImports(State state, Environment env) throws Exception {
+     // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+     // we must also do a BulkMinusOne to keep the table consistent
+     if (getClass().equals(BulkPlusOne.class)) {
+       // Only queue up more imports if the number of queued tasks already
+       // exceeds the number of tservers by 50x
 -      return SelectiveQueueing.shouldQueueOperation(state);
++      return SelectiveQueueing.shouldQueueOperation(state, env);
+     }
+ 
+     return true;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index f6f2e87,1704e49..6338d29
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@@ -16,20 -16,18 +16,20 @@@
   */
  package org.apache.accumulo.test.randomwalk.bulk;
  
 -import org.apache.accumulo.core.Constants;
 +import java.nio.charset.StandardCharsets;
 +
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.test.randomwalk.Environment;
  import org.apache.accumulo.test.randomwalk.State;
  
- public class BulkMinusOne extends BulkTest {
-   
+ public class BulkMinusOne extends BulkImportTest {
+ 
 -  private static final Value negOne = new Value("-1".getBytes(Constants.UTF8));
 +  private static final Value negOne = new Value("-1".getBytes(StandardCharsets.UTF_8));
-   
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      log.info("Decrementing");
 -    BulkPlusOne.bulkLoadLots(log, state, negOne);
 +    BulkPlusOne.bulkLoadLots(log, state, env, negOne);
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index d605e8e,6d56f13..c54a8e7
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@@ -54,10 -53,10 +54,10 @@@ public class BulkPlusOne extends BulkIm
    }
    public static final Text MARKER_CF = new Text("marker");
    static final AtomicLong counter = new AtomicLong();
-   
+ 
    private static final Value ONE = new Value("1".getBytes());
  
 -  static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
 +  static void bulkLoadLots(Logger log, State state, Environment env, Value value) throws Exception {
      final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
      final Path fail = new Path(dir.toString() + "_fail");
      final DefaultConfiguration defaultConfiguration = AccumuloConfiguration.getDefaultConfiguration();
@@@ -106,11 -105,11 +106,11 @@@
      fs.delete(fail, true);
      log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
    }
-   
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      log.info("Incrementing");
 -    bulkLoadLots(log, state, ONE);
 +    bulkLoadLots(log, state, env, ONE);
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
index 6c0c68e,b24f61a..07d1f4c
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
@@@ -23,9 -22,9 +23,9 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.Test;
  
  public abstract class BulkTest extends Test {
-   
+ 
    @Override
 -  public void visit(final State state, Properties props) throws Exception {
 +  public void visit(final State state, final Environment env, Properties props) throws Exception {
      Setup.run(state, new Runnable() {
        @Override
        public void run() {
@@@ -35,10 -34,10 +35,10 @@@
            log.error(ex, ex);
          }
        }
-       
+ 
      });
    }
-   
+ 
 -  abstract protected void runLater(State state) throws Exception;
 +  abstract protected void runLater(State state, Environment env) throws Exception;
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
index 7561709,8b17256..c526ffa
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
@@@ -20,15 -19,15 +20,15 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Compact extends BulkTest {
-   
+ public class Compact extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      final Text[] points = Merge.getRandomTabletRange(state);
      final String rangeString = Merge.rangeToString(points);
      log.info("Compacting " + rangeString);
 -    state.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true);
 +    env.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true);
      log.info("Compaction " + rangeString + " finished");
    }
-   
+ 
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index d1ff8cb,7e528a7..39ef3d8
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@@ -29,10 -28,10 +29,10 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class ConsistencyCheck extends BulkTest {
-   
+ public class ConsistencyCheck extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      Random rand = (Random) state.get("rand");
      Text row = Merge.getRandomRow(rand);
      log.info("Checking " + row);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
index 04af6c5,9d66e0c..8508242
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
@@@ -23,16 -22,16 +23,16 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Merge extends BulkTest {
-   
+ public class Merge extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      Text[] points = getRandomTabletRange(state);
      log.info("merging " + rangeToString(points));
 -    state.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]);
 +    env.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]);
      log.info("merging " + rangeToString(points) + " complete");
    }
-   
+ 
    public static String rangeToString(Text[] points) {
      return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]";
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
index 0000000,ca66775..48bf86a
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
@@@ -1,0 -1,41 +1,42 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.Properties;
+ 
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ 
+ /**
+  * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations.
+  */
+ public abstract class SelectiveBulkTest extends BulkTest {
+ 
+   @Override
 -  public void visit(State state, Properties props) throws Exception {
 -    if (SelectiveQueueing.shouldQueueOperation(state)) {
 -      super.visit(state, props);
++  public void visit(State state, Environment env, Properties props) throws Exception {
++    if (SelectiveQueueing.shouldQueueOperation(state, env)) {
++      super.visit(state, env, props);
+     } else {
+       log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already");
+       log.debug("Waiting 30 seconds before continuing");
+       try {
+         Thread.sleep(30 * 1000);
+       } catch (InterruptedException e) {}
+     }
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
index 0000000,e6dde4d..c7f490c
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
@@@ -1,0 -1,48 +1,49 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.randomwalk.bulk;
+ 
+ import java.util.concurrent.ThreadPoolExecutor;
+ 
+ import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.test.randomwalk.Environment;
+ import org.apache.accumulo.test.randomwalk.State;
+ import org.apache.log4j.Logger;
+ 
+ /**
+  * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers.
+  */
+ public class SelectiveQueueing {
+   private static final Logger log = Logger.getLogger(SelectiveQueueing.class);
+ 
 -  public static boolean shouldQueueOperation(State state) throws Exception {
++  public static boolean shouldQueueOperation(State state, Environment env) throws Exception {
+     final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+     long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
 -    final Connector conn = state.getConnector();
++    final Connector conn = env.getConnector();
+     int numTservers = conn.instanceOperations().getTabletServers().size();
+ 
+     if (!shouldQueue(queuedThreads, numTservers)) {
+       log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
+       return false;
+     }
+ 
+     return true;
+   }
+ 
+   private static boolean shouldQueue(long queuedThreads, int numTservers) {
+     return queuedThreads < numTservers * 50;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ebe6eecc/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
index 21b453d,7a93321..b69805d
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
@@@ -24,18 -23,18 +24,18 @@@ import org.apache.accumulo.test.randomw
  import org.apache.accumulo.test.randomwalk.State;
  import org.apache.hadoop.io.Text;
  
- public class Split extends BulkTest {
-   
+ public class Split extends SelectiveBulkTest {
+ 
    @Override
 -  protected void runLater(State state) throws Exception {
 +  protected void runLater(State state, Environment env) throws Exception {
      SortedSet<Text> splits = new TreeSet<Text>();
      Random rand = (Random) state.get("rand");
      int count = rand.nextInt(20);
      for (int i = 0; i < count; i++)
        splits.add(new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS)));
      log.info("splitting " + splits);
 -    state.getConnector().tableOperations().addSplits(Setup.getTableName(), splits);
 +    env.getConnector().tableOperations().addSplits(Setup.getTableName(), splits);
      log.info("split for " + splits + " finished");
    }
-   
+ 
  }


[6/7] git commit: ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work itself down before adding more.

Posted by el...@apache.org.
ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work itself down before adding more.

If the queue size on the threadpool is greater than 50 times the number of
tservers, wait 30 seconds before adding more tasks. This should still ensure
that we can keep Accumulo sufficiently busy without creating an absurd
number of tasks that the test client will never reasonably work through.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1477c130
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1477c130
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1477c130

Branch: refs/heads/1.6
Commit: 1477c130143bd2477fb8e29bc22ba4698aae3599
Parents: c4f3fc8
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 24 15:01:22 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:47 2014 -0400

----------------------------------------------------------------------
 .../test/randomwalk/bulk/BulkImportTest.java    | 84 ++++++++++++++++++++
 .../test/randomwalk/bulk/BulkMinusOne.java      |  8 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       | 20 ++---
 .../accumulo/test/randomwalk/bulk/BulkTest.java |  8 +-
 .../accumulo/test/randomwalk/bulk/Compact.java  |  6 +-
 .../test/randomwalk/bulk/ConsistencyCheck.java  |  6 +-
 .../accumulo/test/randomwalk/bulk/Merge.java    | 12 +--
 .../test/randomwalk/bulk/SelectiveBulkTest.java | 41 ++++++++++
 .../test/randomwalk/bulk/SelectiveQueueing.java | 48 +++++++++++
 .../accumulo/test/randomwalk/bulk/Split.java    |  6 +-
 10 files changed, 206 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
new file mode 100644
index 0000000..aa44741
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports
+ * of MinusOne.
+ */
+public abstract class BulkImportTest extends BulkTest {
+
+  public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+
+  @Override
+  public void visit(final State state, Properties props) throws Exception {
+    /**
+     * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and
+     * making decisions about what to do before we submit something to the thread pool, we're fine.
+     */
+
+    String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+    // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne
+    // with skipping the new BulkMinusOne to make sure that we maintain consistency
+    if (null != lastImportSkipped) {
+      if (!getClass().equals(BulkMinusOne.class)) {
+        throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+            + getClass().getName());
+      }
+
+      if (TRUE.equals(lastImportSkipped)) {
+        log.debug("Last import was skipped, skipping this import to ensure consistency");
+        state.remove(SKIPPED_IMPORT);
+
+        // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+        log.debug("Waiting 30s before continuing");
+        try {
+          Thread.sleep(30 * 1000);
+        } catch (InterruptedException e) {}
+
+        return;
+      } else {
+        // last import was not skipped, remove the marker
+        state.remove(SKIPPED_IMPORT);
+      }
+    }
+
+    if (shouldQueueMoreImports(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Not queuing more imports this round because too many are already queued");
+      state.set(SKIPPED_IMPORT, TRUE);
+      // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne
+    }
+  }
+
+  private boolean shouldQueueMoreImports(State state) throws Exception {
+    // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+    // we must also do a BulkMinusOne to keep the table consistent
+    if (getClass().equals(BulkPlusOne.class)) {
+      // Only queue up more imports if the number of queued tasks already
+      // exceeds the number of tservers by 50x
+      return SelectiveQueueing.shouldQueueOperation(state);
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index 4ebf23f..1704e49 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.test.randomwalk.State;
 
-public class BulkMinusOne extends BulkTest {
-  
+public class BulkMinusOne extends BulkImportTest {
+
   private static final Value negOne = new Value("-1".getBytes(Constants.UTF8));
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Decrementing");
     BulkPlusOne.bulkLoadLots(log, state, negOne);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index cdfbb36..6d56f13 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
-public class BulkPlusOne extends BulkTest {
-  
+public class BulkPlusOne extends BulkImportTest {
+
   public static final int LOTS = 100000;
   public static final int COLS = 10;
   public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16));
@@ -53,7 +53,7 @@ public class BulkPlusOne extends BulkTest {
   }
   public static final Text MARKER_CF = new Text("marker");
   static final AtomicLong counter = new AtomicLong();
-  
+
   private static final Value ONE = new Value("1".getBytes());
 
   static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
@@ -64,22 +64,22 @@ public class BulkPlusOne extends BulkTest {
     final FileSystem fs = (FileSystem) state.get("fs");
     fs.mkdirs(fail);
     final int parts = rand.nextInt(10) + 1;
-    
+
     TreeSet<Integer> startRows = new TreeSet<Integer>();
     startRows.add(0);
     while (startRows.size() < parts)
       startRows.add(rand.nextInt(LOTS));
-    
+
     List<String> printRows = new ArrayList<String>(startRows.size());
     for (Integer row : startRows)
       printRows.add(String.format(FMT, row));
-    
+
     String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
     log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
-    
+
     List<Integer> rows = new ArrayList<Integer>(startRows);
     rows.add(LOTS);
-    
+
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
       FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration);
@@ -105,11 +105,11 @@ public class BulkPlusOne extends BulkTest {
     fs.delete(fail, true);
     log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
   }
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Incrementing");
     bulkLoadLots(log, state, ONE);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
index 4afefd9..b24f61a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
 
 public abstract class BulkTest extends Test {
-  
+
   @Override
   public void visit(final State state, Properties props) throws Exception {
     Setup.run(state, new Runnable() {
@@ -34,10 +34,10 @@ public abstract class BulkTest extends Test {
           log.error(ex, ex);
         }
       }
-      
+
     });
   }
-  
+
   abstract protected void runLater(State state) throws Exception;
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
index 86dae5c..8b17256 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test.randomwalk.bulk;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Compact extends BulkTest {
-  
+public class Compact extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     final Text[] points = Merge.getRandomTabletRange(state);
@@ -29,5 +29,5 @@ public class Compact extends BulkTest {
     state.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true);
     log.info("Compaction " + rangeString + " finished");
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index e60f8cf..7e528a7 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@ -28,8 +28,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class ConsistencyCheck extends BulkTest {
-  
+public class ConsistencyCheck extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Random rand = (Random) state.get("rand");
@@ -52,5 +52,5 @@ public class ConsistencyCheck extends BulkTest {
         throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
index 2dd0345..9d66e0c 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
@@ -22,8 +22,8 @@ import java.util.Random;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Merge extends BulkTest {
-  
+public class Merge extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Text[] points = getRandomTabletRange(state);
@@ -31,15 +31,15 @@ public class Merge extends BulkTest {
     state.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]);
     log.info("merging " + rangeToString(points) + " complete");
   }
-  
+
   public static String rangeToString(Text[] points) {
     return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]";
   }
-  
+
   public static Text getRandomRow(Random rand) {
     return new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS));
   }
-  
+
   public static Text[] getRandomTabletRange(State state) {
     Random rand = (Random) state.get("rand");
     Text points[] = {getRandomRow(rand), getRandomRow(rand),};
@@ -56,5 +56,5 @@ public class Merge extends BulkTest {
     }
     return points;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
new file mode 100644
index 0000000..ca66775
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations.
+ */
+public abstract class SelectiveBulkTest extends BulkTest {
+
+  @Override
+  public void visit(State state, Properties props) throws Exception {
+    if (SelectiveQueueing.shouldQueueOperation(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already");
+      log.debug("Waiting 30 seconds before continuing");
+      try {
+        Thread.sleep(30 * 1000);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
new file mode 100644
index 0000000..e6dde4d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.test.randomwalk.State;
+import org.apache.log4j.Logger;
+
+/**
+ * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers.
+ */
+public class SelectiveQueueing {
+  private static final Logger log = Logger.getLogger(SelectiveQueueing.class);
+
+  public static boolean shouldQueueOperation(State state) throws Exception {
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+    long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
+    final Connector conn = state.getConnector();
+    int numTservers = conn.instanceOperations().getTabletServers().size();
+
+    if (!shouldQueue(queuedThreads, numTservers)) {
+      log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean shouldQueue(long queuedThreads, int numTservers) {
+    return queuedThreads < numTservers * 50;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
index 157e2ab..7a93321 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
@@ -23,8 +23,8 @@ import java.util.TreeSet;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Split extends BulkTest {
-  
+public class Split extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     SortedSet<Text> splits = new TreeSet<Text>();
@@ -36,5 +36,5 @@ public class Split extends BulkTest {
     state.getConnector().tableOperations().addSplits(Setup.getTableName(), splits);
     log.info("split for " + splits + " finished");
   }
-  
+
 }


[5/7] git commit: ACCUMULO-3257 Include the actual Node's name in log message.

Posted by el...@apache.org.
ACCUMULO-3257 Include the actual Node's name in log message.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1635ad5c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1635ad5c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1635ad5c

Branch: refs/heads/master
Commit: 1635ad5c763de77c793262f925e2f7f12b52b1c7
Parents: 1477c13
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 24 15:23:08 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:47 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/test/randomwalk/Module.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1635ad5c/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index 57b49ae..5756934 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@ -294,13 +294,13 @@ public class Module extends Node {
             // Bound the time we'll wait for the node to complete
             nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
           } catch (InterruptedException e) {
-            log.warn("Interrupted waiting for RandomWalk node to complete. Exiting.", e);
+            log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
             break;
           } catch (ExecutionException e) {
-            log.error("Caught error executing RandomWalk node", e);
+            log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
             throw e;
           } catch (TimeoutException e) {
-            log.info("Timed out waiting for RandomWalk node to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+            log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
             break;
           }
 


[3/7] git commit: ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work itself down before adding more.

Posted by el...@apache.org.
ACCUMULO-3259 Be a little smarter in Bulk.xml to let the backlog of tasks work itself down before adding more.

If the queue size on the threadpool is greater than 50 times the number of
tservers, wait 30 seconds before adding more tasks. This should still ensure
that we can keep Accumulo sufficiently busy without creating an absurd
number of tasks that the test client will never reasonably work through.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1477c130
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1477c130
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1477c130

Branch: refs/heads/master
Commit: 1477c130143bd2477fb8e29bc22ba4698aae3599
Parents: c4f3fc8
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 24 15:01:22 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:47 2014 -0400

----------------------------------------------------------------------
 .../test/randomwalk/bulk/BulkImportTest.java    | 84 ++++++++++++++++++++
 .../test/randomwalk/bulk/BulkMinusOne.java      |  8 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       | 20 ++---
 .../accumulo/test/randomwalk/bulk/BulkTest.java |  8 +-
 .../accumulo/test/randomwalk/bulk/Compact.java  |  6 +-
 .../test/randomwalk/bulk/ConsistencyCheck.java  |  6 +-
 .../accumulo/test/randomwalk/bulk/Merge.java    | 12 +--
 .../test/randomwalk/bulk/SelectiveBulkTest.java | 41 ++++++++++
 .../test/randomwalk/bulk/SelectiveQueueing.java | 48 +++++++++++
 .../accumulo/test/randomwalk/bulk/Split.java    |  6 +-
 10 files changed, 206 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
new file mode 100644
index 0000000..aa44741
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkImportTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * If we have a sufficient back-up of imports, let them work off before adding even more bulk-imports. Imports of PlusOne must always be balanced with imports
+ * of MinusOne.
+ */
+public abstract class BulkImportTest extends BulkTest {
+
+  public static final String SKIPPED_IMPORT = "skipped.import", TRUE = Boolean.TRUE.toString(), FALSE = Boolean.FALSE.toString();
+
+  @Override
+  public void visit(final State state, Properties props) throws Exception {
+    /**
+     * Each visit() is performed sequentially and then submitted to the threadpool which will have async execution. As long as we're checking the state and
+     * making decisions about what to do before we submit something to the thread pool, we're fine.
+     */
+
+    String lastImportSkipped = state.getString(SKIPPED_IMPORT);
+    // We have a marker in the state for the previous insert, we have to balance skipping BulkPlusOne
+    // with skipping the new BulkMinusOne to make sure that we maintain consistency
+    if (null != lastImportSkipped) {
+      if (!getClass().equals(BulkMinusOne.class)) {
+        throw new IllegalStateException("Should not have a skipped import marker for a class other than " + BulkMinusOne.class.getName() + " but was "
+            + getClass().getName());
+      }
+
+      if (TRUE.equals(lastImportSkipped)) {
+        log.debug("Last import was skipped, skipping this import to ensure consistency");
+        state.remove(SKIPPED_IMPORT);
+
+        // Wait 30s to balance the skip of a BulkPlusOne/BulkMinusOne pair
+        log.debug("Waiting 30s before continuing");
+        try {
+          Thread.sleep(30 * 1000);
+        } catch (InterruptedException e) {}
+
+        return;
+      } else {
+        // last import was not skipped, remove the marker
+        state.remove(SKIPPED_IMPORT);
+      }
+    }
+
+    if (shouldQueueMoreImports(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Not queuing more imports this round because too many are already queued");
+      state.set(SKIPPED_IMPORT, TRUE);
+      // Don't sleep here, let the sleep happen when we skip the next BulkMinusOne
+    }
+  }
+
+  private boolean shouldQueueMoreImports(State state) throws Exception {
+    // Only selectively import when it's BulkPlusOne. If we did a BulkPlusOne,
+    // we must also do a BulkMinusOne to keep the table consistent
+    if (getClass().equals(BulkPlusOne.class)) {
+      // Only queue up more imports if the number of queued tasks already
+      // exceeds the number of tservers by 50x
+      return SelectiveQueueing.shouldQueueOperation(state);
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
index 4ebf23f..1704e49 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java
@@ -20,14 +20,14 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.test.randomwalk.State;
 
-public class BulkMinusOne extends BulkTest {
-  
+public class BulkMinusOne extends BulkImportTest {
+
   private static final Value negOne = new Value("-1".getBytes(Constants.UTF8));
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Decrementing");
     BulkPlusOne.bulkLoadLots(log, state, negOne);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index cdfbb36..6d56f13 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
-public class BulkPlusOne extends BulkTest {
-  
+public class BulkPlusOne extends BulkImportTest {
+
   public static final int LOTS = 100000;
   public static final int COLS = 10;
   public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16));
@@ -53,7 +53,7 @@ public class BulkPlusOne extends BulkTest {
   }
   public static final Text MARKER_CF = new Text("marker");
   static final AtomicLong counter = new AtomicLong();
-  
+
   private static final Value ONE = new Value("1".getBytes());
 
   static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
@@ -64,22 +64,22 @@ public class BulkPlusOne extends BulkTest {
     final FileSystem fs = (FileSystem) state.get("fs");
     fs.mkdirs(fail);
     final int parts = rand.nextInt(10) + 1;
-    
+
     TreeSet<Integer> startRows = new TreeSet<Integer>();
     startRows.add(0);
     while (startRows.size() < parts)
       startRows.add(rand.nextInt(LOTS));
-    
+
     List<String> printRows = new ArrayList<String>(startRows.size());
     for (Integer row : startRows)
       printRows.add(String.format(FMT, row));
-    
+
     String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
     log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
-    
+
     List<Integer> rows = new ArrayList<Integer>(startRows);
     rows.add(LOTS);
-    
+
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
       FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration);
@@ -105,11 +105,11 @@ public class BulkPlusOne extends BulkTest {
     fs.delete(fail, true);
     log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
   }
-  
+
   @Override
   protected void runLater(State state) throws Exception {
     log.info("Incrementing");
     bulkLoadLots(log, state, ONE);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
index 4afefd9..b24f61a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkTest.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
 
 public abstract class BulkTest extends Test {
-  
+
   @Override
   public void visit(final State state, Properties props) throws Exception {
     Setup.run(state, new Runnable() {
@@ -34,10 +34,10 @@ public abstract class BulkTest extends Test {
           log.error(ex, ex);
         }
       }
-      
+
     });
   }
-  
+
   abstract protected void runLater(State state) throws Exception;
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
index 86dae5c..8b17256 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Compact.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.test.randomwalk.bulk;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Compact extends BulkTest {
-  
+public class Compact extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     final Text[] points = Merge.getRandomTabletRange(state);
@@ -29,5 +29,5 @@ public class Compact extends BulkTest {
     state.getConnector().tableOperations().compact(Setup.getTableName(), points[0], points[1], false, true);
     log.info("Compaction " + rangeString + " finished");
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
index e60f8cf..7e528a7 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java
@@ -28,8 +28,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class ConsistencyCheck extends BulkTest {
-  
+public class ConsistencyCheck extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Random rand = (Random) state.get("rand");
@@ -52,5 +52,5 @@ public class ConsistencyCheck extends BulkTest {
         throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
index 2dd0345..9d66e0c 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Merge.java
@@ -22,8 +22,8 @@ import java.util.Random;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Merge extends BulkTest {
-  
+public class Merge extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     Text[] points = getRandomTabletRange(state);
@@ -31,15 +31,15 @@ public class Merge extends BulkTest {
     state.getConnector().tableOperations().merge(Setup.getTableName(), points[0], points[1]);
     log.info("merging " + rangeToString(points) + " complete");
   }
-  
+
   public static String rangeToString(Text[] points) {
     return "(" + (points[0] == null ? "-inf" : points[0]) + " -> " + (points[1] == null ? "+inf" : points[1]) + "]";
   }
-  
+
   public static Text getRandomRow(Random rand) {
     return new Text(String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS));
   }
-  
+
   public static Text[] getRandomTabletRange(State state) {
     Random rand = (Random) state.get("rand");
     Text points[] = {getRandomRow(rand), getRandomRow(rand),};
@@ -56,5 +56,5 @@ public class Merge extends BulkTest {
     }
     return points;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
new file mode 100644
index 0000000..ca66775
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveBulkTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.Properties;
+
+import org.apache.accumulo.test.randomwalk.State;
+
+/**
+ * Selectively runs the actual {@link BulkTest} based on the number of active TServers and the number of queued operations.
+ */
+public abstract class SelectiveBulkTest extends BulkTest {
+
+  @Override
+  public void visit(State state, Properties props) throws Exception {
+    if (SelectiveQueueing.shouldQueueOperation(state)) {
+      super.visit(state, props);
+    } else {
+      log.debug("Skipping queueing of " + getClass().getSimpleName() + " because of excessive queued tasks already");
+      log.debug("Waiting 30 seconds before continuing");
+      try {
+        Thread.sleep(30 * 1000);
+      } catch (InterruptedException e) {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
new file mode 100644
index 0000000..e6dde4d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/SelectiveQueueing.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.randomwalk.bulk;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.test.randomwalk.State;
+import org.apache.log4j.Logger;
+
+/**
+ * Chooses whether or not an operation should be queued based on the current thread pool queue length and the number of available TServers.
+ */
+public class SelectiveQueueing {
+  private static final Logger log = Logger.getLogger(SelectiveQueueing.class);
+
+  public static boolean shouldQueueOperation(State state) throws Exception {
+    final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool");
+    long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount();
+    final Connector conn = state.getConnector();
+    int numTservers = conn.instanceOperations().getTabletServers().size();
+
+    if (!shouldQueue(queuedThreads, numTservers)) {
+      log.info("Not queueing because of " + queuedThreads + " outstanding tasks");
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean shouldQueue(long queuedThreads, int numTservers) {
+    return queuedThreads < numTservers * 50;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1477c130/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
index 157e2ab..7a93321 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/Split.java
@@ -23,8 +23,8 @@ import java.util.TreeSet;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.hadoop.io.Text;
 
-public class Split extends BulkTest {
-  
+public class Split extends SelectiveBulkTest {
+
   @Override
   protected void runLater(State state) throws Exception {
     SortedSet<Text> splits = new TreeSet<Text>();
@@ -36,5 +36,5 @@ public class Split extends BulkTest {
     state.getConnector().tableOperations().addSplits(Setup.getTableName(), splits);
     log.info("split for " + splits + " finished");
   }
-  
+
 }


[2/7] git commit: ACCUMULO-3257 Use a threadpool to ensure that a Node doesn't exceed maxSec.

Posted by el...@apache.org.
ACCUMULO-3257 Use a threadpool to ensure that a Node doesn't exceed maxSec.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c4f3fc83
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c4f3fc83
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c4f3fc83

Branch: refs/heads/master
Commit: c4f3fc8317bd91aa02c71a405356545e1561d247
Parents: e3a743c
Author: Josh Elser <el...@apache.org>
Authored: Thu Oct 23 16:29:40 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:28 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/test/randomwalk/Module.java | 419 +++++++++++--------
 1 file changed, 241 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4f3fc83/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index c71d2d0..57b49ae 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@ -27,6 +27,12 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.xml.XMLConstants;
@@ -37,6 +43,7 @@ import javax.xml.validation.SchemaFactory;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.log4j.Level;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -46,15 +53,15 @@ import org.w3c.dom.NodeList;
  * A module is directed graph of tests
  */
 public class Module extends Node {
-  
+
   private class Dummy extends Node {
-    
+
     String name;
-    
+
     Dummy(String name) {
       this.name = name;
     }
-    
+
     @Override
     public void visit(State state, Properties props) {
       String print;
@@ -63,86 +70,88 @@ public class Module extends Node {
         log.log(level, name);
       }
     }
-    
+
+    @Override
     public String toString() {
       return name;
     }
   }
-  
+
   private class Alias extends Node {
-    
+
     Node target;
     String targetId;
     String id;
-    
+
     Alias(String id) {
       target = null;
       this.id = id;
     }
-    
+
     @Override
     public void visit(State state, Properties props) throws Exception {
       throw new Exception("You don't visit aliases!");
     }
-    
+
+    @Override
     public String toString() {
       return id;
     }
-    
+
     public void update(String node) throws Exception {
       targetId = node;
       target = getNode(node);
     }
-    
+
     public Node get() {
       return target;
     }
-    
+
     public String getTargetId() {
       return targetId;
     }
   }
-  
+
   private HashMap<String,Node> nodes = new HashMap<String,Node>();
   private HashMap<String,Properties> localProps = new HashMap<String,Properties>();
-  
+
   private class Edge {
     String nodeId;
     int weight;
   }
-  
+
   private class AdjList {
-    
+
     private List<Edge> edges = new ArrayList<Edge>();
     private int totalWeight = 0;
     private Random rand = new Random();
-    
+
     /**
      * Adds a neighbor node and weight of edge
      */
     private void addEdge(String nodeId, int weight) {
-      
+
       totalWeight += weight;
-      
+
       Edge e = new Edge();
       e.nodeId = nodeId;
       e.weight = weight;
       edges.add(e);
     }
-    
+
     /**
      * Chooses a random neighbor node
-     * 
+     *
      * @return Node or null if no edges
      */
     private String randomNeighbor() throws Exception {
-      
+
       String nodeId = null;
       rand = new Random();
-      
+
       int randNum = rand.nextInt(totalWeight) + 1;
       int sum = 0;
-      
+
       for (Edge e : edges) {
         nodeId = e.nodeId;
         sum += e.weight;
@@ -153,24 +162,24 @@ public class Module extends Node {
       return nodeId;
     }
   }
-  
+
   private HashMap<String,String> prefixes = new HashMap<String,String>();
   private HashMap<String,AdjList> adjMap = new HashMap<String,AdjList>();
   private HashMap<String,Set<String>> aliasMap = new HashMap<String,Set<String>>();
   private final File xmlFile;
   private String initNodeId;
   private Fixture fixture = null;
-  
+
   public Module(File xmlFile) throws Exception {
     this.xmlFile = xmlFile;
     loadFromXml();
   }
-  
+
   @Override
-  public void visit(State state, Properties props) throws Exception {
+  public void visit(final State state, Properties props) throws Exception {
     int maxHops, maxSec;
     boolean teardown;
-    
+
     Properties initProps = getProps("_init");
     initProps.putAll(props);
     String prop;
@@ -178,136 +187,190 @@ public class Module extends Node {
       maxHops = Integer.MAX_VALUE;
     else
       maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));
-    
+
     if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
       maxSec = Integer.MAX_VALUE;
     else
       maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));
-    
+
     if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true") || prop.equals(""))
       teardown = true;
     else
       teardown = false;
-    
+
     if (fixture != null) {
       fixture.setUp(state);
     }
-    
-    Node initNode = getNode(initNodeId);
-    
-    boolean test = false;
-    if (initNode instanceof Test) {
-      startTimer(initNode);
-      test = true;
-    }
-    initNode.visit(state, getProps(initNodeId));
-    if (test)
-      stopTimer(initNode);
-    
-    state.visitedNode();
-    // update aliases
-    Set<String> aliases;
-    if ((aliases = aliasMap.get(initNodeId)) != null)
-      for (String alias : aliases) {
-        ((Alias) nodes.get(alias)).update(initNodeId);
-      }
-    
-    String curNodeId = initNodeId;
-    int numHops = 0;
-    long startTime = System.currentTimeMillis() / 1000;
-    while (true) {
-      // check if END state was reached
-      if (curNodeId.equalsIgnoreCase("END")) {
-        log.debug("reached END state");
-        break;
-      }
-      // check if maxSec was reached
-      long curTime = System.currentTimeMillis() / 1000;
-      if ((curTime - startTime) > maxSec) {
-        log.debug("reached maxSec(" + maxSec + ")");
-        break;
-      }
-      // check if maxHops was reached
-      if (numHops > maxHops) {
-        log.debug("reached maxHops(" + maxHops + ")");
-        break;
-      }
-      numHops++;
-      
-      if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
-        throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
-      }
-      AdjList adj = adjMap.get(curNodeId);
-      String nextNodeId = adj.randomNeighbor();
-      Node nextNode = getNode(nextNodeId);
-      if (nextNode instanceof Alias) {
-        nextNodeId = ((Alias) nextNode).getTargetId();
-        nextNode = ((Alias) nextNode).get();
+
+    ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
+
+    try {
+      Node initNode = getNode(initNodeId);
+
+      boolean test = false;
+      if (initNode instanceof Test) {
+        startTimer(initNode);
+        test = true;
       }
-      Properties nodeProps = getProps(nextNodeId);
-      try {
-        test = false;
-        if (nextNode instanceof Test) {
-          startTimer(nextNode);
-          test = true;
+      initNode.visit(state, getProps(initNodeId));
+      if (test)
+        stopTimer(initNode);
+
+      state.visitedNode();
+      // update aliases
+      Set<String> aliases;
+      if ((aliases = aliasMap.get(initNodeId)) != null)
+        for (String alias : aliases) {
+          ((Alias) nodes.get(alias)).update(initNodeId);
         }
-        nextNode.visit(state, nodeProps);
-        if (test)
-          stopTimer(nextNode);
-      } catch (Exception e) {
-        log.debug("Connector belongs to user: " + state.getConnector().whoami());
-        log.debug("Exception occured at: " + System.currentTimeMillis());
-        log.debug("Properties for node: " + nextNodeId);
-        for (Entry<Object,Object> entry : nodeProps.entrySet()) {
-          log.debug("  " + entry.getKey() + ": " + entry.getValue());
+
+      String curNodeId = initNodeId;
+      int numHops = 0;
+      long startTime = System.currentTimeMillis() / 1000;
+      while (true) {
+        // check if END state was reached
+        if (curNodeId.equalsIgnoreCase("END")) {
+          log.debug("reached END state");
+          break;
         }
-        log.debug("Overall Properties");
-        for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
-          log.debug("  " + entry.getKey() + ": " + entry.getValue());
+        // check if maxSec was reached
+        long curTime = System.currentTimeMillis() / 1000;
+        if ((curTime - startTime) > maxSec) {
+          log.debug("reached maxSec(" + maxSec + ")");
+          break;
         }
-        log.debug("State information");
-        for (String key : new TreeSet<String>(state.getMap().keySet()))  {
-          Object value = state.getMap().get(key);
-          String logMsg = "  " + key + ": ";
-          if (value == null)
-            logMsg += "null";
-          else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
-            logMsg += value;
-          else if (value instanceof byte[])
-            logMsg += new String((byte[])value, Constants.UTF8);
-          else if (value instanceof PasswordToken)
-            logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8);
-          else
-            logMsg += value.getClass()+ " - " + value;
-          
-          log.debug(logMsg);
+
+        // The number of seconds before the test should exit
+        long secondsRemaining = maxSec - (curTime - startTime);
+
+        // check if maxHops was reached
+        if (numHops > maxHops) {
+          log.debug("reached maxHops(" + maxHops + ")");
+          break;
         }
-        throw new Exception("Error running node " + nextNodeId, e);
-      }
-      state.visitedNode();
-      
-      // update aliases
-      if ((aliases = aliasMap.get(curNodeId)) != null)
-        for (String alias : aliases) {
-          ((Alias) nodes.get(alias)).update(curNodeId);
+        numHops++;
+
+        if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
+          throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
+        }
+        AdjList adj = adjMap.get(curNodeId);
+        String nextNodeId = adj.randomNeighbor();
+        final Node nextNode;
+        Node nextNodeOrAlias = getNode(nextNodeId);
+        if (nextNodeOrAlias instanceof Alias) {
+          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
+          nextNode = ((Alias) nextNodeOrAlias).get();
+        } else {
+          nextNode = nextNodeOrAlias;
+        }
+        final Properties nodeProps = getProps(nextNodeId);
+        try {
+          test = false;
+          if (nextNode instanceof Test) {
+            startTimer(nextNode);
+            test = true;
+          }
+
+          // Wrap the visit of the next node in the module in a callable that returns a thrown exception
+          FutureTask<Exception> task = new FutureTask<Exception>(new Callable<Exception>() {
+
+            @Override
+            public Exception call() throws Exception {
+              try {
+                nextNode.visit(state, nodeProps);
+                return null;
+              } catch (Exception e) {
+                return e;
+              }
+            }
+
+          });
+
+          // Run the task (should execute immediately)
+          service.submit(task);
+
+          Exception nodeException;
+          try {
+            // Bound the time we'll wait for the node to complete
+            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            log.warn("Interrupted waiting for RandomWalk node to complete. Exiting.", e);
+            break;
+          } catch (ExecutionException e) {
+            log.error("Caught error executing RandomWalk node", e);
+            throw e;
+          } catch (TimeoutException e) {
+            log.info("Timed out waiting for RandomWalk node to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+            break;
+          }
+
+          // The RandomWalk node throw an Exception that that Callable handed back
+          // Throw it and let the Module perform cleanup
+          if (null != nodeException) {
+            throw nodeException;
+          }
+
+          if (test)
+            stopTimer(nextNode);
+        } catch (Exception e) {
+          log.debug("Connector belongs to user: " + state.getConnector().whoami());
+          log.debug("Exception occured at: " + System.currentTimeMillis());
+          log.debug("Properties for node: " + nextNodeId);
+          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("Overall Properties");
+          for (Entry<Object,Object> entry : state.getProperties().entrySet()) {
+            log.debug("  " + entry.getKey() + ": " + entry.getValue());
+          }
+          log.debug("State information");
+          for (String key : new TreeSet<String>(state.getMap().keySet())) {
+            Object value = state.getMap().get(key);
+            String logMsg = "  " + key + ": ";
+            if (value == null)
+              logMsg += "null";
+            else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
+              logMsg += value;
+            else if (value instanceof byte[])
+              logMsg += new String((byte[]) value, Constants.UTF8);
+            else if (value instanceof PasswordToken)
+              logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8);
+            else
+              logMsg += value.getClass() + " - " + value;
+
+            log.debug(logMsg);
+          }
+          throw new Exception("Error running node " + nextNodeId, e);
         }
-      
-      curNodeId = nextNodeId;
+        state.visitedNode();
+
+        // update aliases
+        if ((aliases = aliasMap.get(curNodeId)) != null)
+          for (String alias : aliases) {
+            ((Alias) nodes.get(alias)).update(curNodeId);
+          }
+
+        curNodeId = nextNodeId;
+      }
+    } finally {
+      if (null != service) {
+        service.shutdownNow();
+      }
     }
-    
+
     if (teardown && (fixture != null)) {
       log.debug("tearing down module");
       fixture.tearDown(state);
     }
   }
-  
+
   Thread timer = null;
   final int time = 5 * 1000 * 60;
   AtomicBoolean runningLong = new AtomicBoolean(false);
   long systemTime;
-  
+
   /**
-   * 
+   *
    */
   private void startTimer(final Node initNode) {
     runningLong.set(false);
@@ -330,9 +393,9 @@ public class Module extends Node {
     initNode.makingProgress();
     timer.start();
   }
-  
+
   /**
-   * 
+   *
    */
   private void stopTimer(Node nextNode) {
     synchronized (timer) {
@@ -346,31 +409,31 @@ public class Module extends Node {
     if (runningLong.get())
       log.warn("Node " + nextNode + ", which was running long, has now completed after " + (System.currentTimeMillis() - systemTime) / 1000.0 + " seconds");
   }
-  
+
   @Override
   public String toString() {
     return xmlFile.toString();
   }
-  
+
   private String getFullName(String name) {
-    
+
     int index = name.indexOf(".");
     if (index == -1 || name.endsWith(".xml")) {
       return name;
     }
-    
+
     String id = name.substring(0, index);
-    
+
     if (!prefixes.containsKey(id)) {
       log.warn("Id (" + id + ") was not found in prefixes");
       return name;
     }
-    
+
     return prefixes.get(id).concat(name.substring(index + 1));
   }
-  
+
   private Node createNode(String id, String src) throws Exception {
-    
+
     // check if id indicates dummy node
     if (id.equalsIgnoreCase("END") || id.startsWith("dummy")) {
       if (nodes.containsKey(id) == false) {
@@ -378,14 +441,14 @@ public class Module extends Node {
       }
       return nodes.get(id);
     }
-    
+
     if (id.startsWith("alias")) {
       if (nodes.containsKey(id) == false) {
         nodes.put(id, new Alias(id));
       }
       return nodes.get(id);
     }
-    
+
     // grab node from framework based on its id or src
     Node node;
     if (src == null || src.isEmpty()) {
@@ -393,44 +456,44 @@ public class Module extends Node {
     } else {
       node = Framework.getInstance().getNode(getFullName(src));
     }
-    
+
     // add to node to this module's map
     nodes.put(id, node);
-    
+
     return node;
   }
-  
+
   private Node getNode(String id) throws Exception {
-    
+
     if (nodes.containsKey(id)) {
       return nodes.get(id);
     }
-    
+
     if (id.equalsIgnoreCase("END")) {
       nodes.put(id, new Dummy(id));
       return nodes.get(id);
     }
-    
+
     return Framework.getInstance().getNode(getFullName(id));
   }
-  
+
   private Properties getProps(String nodeId) {
     if (localProps.containsKey(nodeId)) {
       return localProps.get(nodeId);
     }
     return new Properties();
   }
-  
+
   private void loadFromXml() throws Exception {
     DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
     DocumentBuilder docbuilder;
     Document d = null;
-    
+
     // set the schema
     SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
     Schema moduleSchema = sf.newSchema(this.getClass().getClassLoader().getResource("randomwalk/module.xsd"));
     dbf.setSchema(moduleSchema);
-    
+
     // parse the document
     try {
       docbuilder = dbf.newDocumentBuilder();
@@ -439,7 +502,7 @@ public class Module extends Node {
       log.error("Failed to parse: " + xmlFile, e);
       throw new Exception("Failed to parse: " + xmlFile);
     }
-    
+
     // parse packages
     NodeList nodelist = d.getDocumentElement().getElementsByTagName("package");
     for (int i = 0; i < nodelist.getLength(); i++) {
@@ -450,91 +513,91 @@ public class Module extends Node {
       }
       prefixes.put(el.getAttribute("prefix"), value);
     }
-    
+
     // parse fixture node
     nodelist = d.getDocumentElement().getElementsByTagName("fixture");
     if (nodelist.getLength() > 0) {
       Element fixtureEl = (Element) nodelist.item(0);
       fixture = (Fixture) Class.forName(getFullName(fixtureEl.getAttribute("id"))).newInstance();
     }
-    
+
     // parse initial node
     Element initEl = (Element) d.getDocumentElement().getElementsByTagName("init").item(0);
     initNodeId = initEl.getAttribute("id");
     Properties initProps = new Properties();
     String attr = initEl.getAttribute("maxHops");
-    
+
     if (attr != null)
       initProps.setProperty("maxHops", attr);
     attr = initEl.getAttribute("maxSec");
-    
+
     if (attr != null)
       initProps.setProperty("maxSec", attr);
     attr = initEl.getAttribute("teardown");
-    
+
     if (attr != null)
       initProps.setProperty("teardown", attr);
     localProps.put("_init", initProps);
-    
+
     // parse all nodes
     nodelist = d.getDocumentElement().getElementsByTagName("node");
     for (int i = 0; i < nodelist.getLength(); i++) {
-      
+
       Element nodeEl = (Element) nodelist.item(i);
-      
+
       // get attributes
       String id = nodeEl.getAttribute("id");
       if (adjMap.containsKey(id)) {
         throw new Exception("Module already contains: " + id);
       }
       String src = nodeEl.getAttribute("src");
-      
+
       // create node
       createNode(id, src);
-      
+
       // set some attributes in properties for later use
       Properties props = new Properties();
       props.setProperty("maxHops", nodeEl.getAttribute("maxHops"));
       props.setProperty("maxSec", nodeEl.getAttribute("maxSec"));
       props.setProperty("teardown", nodeEl.getAttribute("teardown"));
-      
+
       // parse aliases
       NodeList aliaslist = nodeEl.getElementsByTagName("alias");
       Set<String> aliases = new TreeSet<String>();
       for (int j = 0; j < aliaslist.getLength(); j++) {
         Element propEl = (Element) aliaslist.item(j);
-        
+
         if (!propEl.hasAttribute("name")) {
           throw new Exception("Node " + id + " has alias with no identifying name");
         }
-        
+
         String key = "alias." + propEl.getAttribute("name");
-        
+
         aliases.add(key);
         createNode(key, null);
       }
       if (aliases.size() > 0)
         aliasMap.put(id, aliases);
-      
+
       // parse properties of nodes
       NodeList proplist = nodeEl.getElementsByTagName("property");
       for (int j = 0; j < proplist.getLength(); j++) {
         Element propEl = (Element) proplist.item(j);
-        
+
         if (!propEl.hasAttribute("key") || !propEl.hasAttribute("value")) {
           throw new Exception("Node " + id + " has property with no key or value");
         }
-        
+
         String key = propEl.getAttribute("key");
-        
+
         if (key.equals("maxHops") || key.equals("maxSec") || key.equals("teardown")) {
           throw new Exception("The following property can only be set in attributes: " + key);
         }
-        
+
         props.setProperty(key, propEl.getAttribute("value"));
       }
       localProps.put(id, props);
-      
+
       // parse edges of nodes
       AdjList edges = new AdjList();
       adjMap.put(id, edges);
@@ -544,13 +607,13 @@ public class Module extends Node {
       }
       for (int j = 0; j < edgelist.getLength(); j++) {
         Element edgeEl = (Element) edgelist.item(j);
-        
+
         String edgeID = edgeEl.getAttribute("id");
-        
+
         if (!edgeEl.hasAttribute("weight")) {
           throw new Exception("Edge with id=" + edgeID + " is missing weight");
         }
-        
+
         int weight = Integer.parseInt(edgeEl.getAttribute("weight"));
         edges.addEdge(edgeID, weight);
       }


[4/7] git commit: ACCUMULO-3257 Include the actual Node's name in log message.

Posted by el...@apache.org.
ACCUMULO-3257 Include the actual Node's name in log message.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1635ad5c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1635ad5c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1635ad5c

Branch: refs/heads/1.6
Commit: 1635ad5c763de77c793262f925e2f7f12b52b1c7
Parents: 1477c13
Author: Josh Elser <el...@apache.org>
Authored: Fri Oct 24 15:23:08 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Oct 24 15:24:47 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/test/randomwalk/Module.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1635ad5c/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
index 57b49ae..5756934 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java
@@ -294,13 +294,13 @@ public class Module extends Node {
             // Bound the time we'll wait for the node to complete
             nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
           } catch (InterruptedException e) {
-            log.warn("Interrupted waiting for RandomWalk node to complete. Exiting.", e);
+            log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
             break;
           } catch (ExecutionException e) {
-            log.error("Caught error executing RandomWalk node", e);
+            log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
             throw e;
           } catch (TimeoutException e) {
-            log.info("Timed out waiting for RandomWalk node to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
+            log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
             break;
           }