You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/11/15 01:39:18 UTC

git commit: ACCUMULO-1614 use zipfian dist in conditional RW so there are collisions as data size increases

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT feb0f3151 -> 484053a2f


ACCUMULO-1614 use zipfian dist in conditional RW so there are collisions as data size increases


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/484053a2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/484053a2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/484053a2

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 484053a2f0823706e0a386202da75838f19d22d1
Parents: feb0f31
Author: Keith Turner <kt...@apache.org>
Authored: Thu Nov 14 19:37:12 2013 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Thu Nov 14 19:37:12 2013 -0500

----------------------------------------------------------------------
 .../test/randomwalk/conditional/Init.java       | 28 +++++++++++++++++---
 .../test/randomwalk/conditional/Setup.java      | 10 ++++---
 .../test/randomwalk/conditional/Transfer.java   | 15 ++++++++---
 .../randomwalk/conf/modules/Conditional.xml     | 14 +++++-----
 4 files changed, 49 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/484053a2/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java
index c336e97..e3de7d8 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Init.java
@@ -16,13 +16,19 @@
  */
 package org.apache.accumulo.test.randomwalk.conditional;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
 import org.apache.accumulo.core.data.Condition;
 import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
+import org.apache.hadoop.io.Text;
 
 /**
  * 
@@ -34,10 +40,25 @@ public class Init extends Test {
 
     int numBanks = (Integer) state.get("numBanks");
     int numAccts = (Integer) state.get("numAccts");
+
+    // add some splits to spread ingest out a little
+    TreeSet<Text> splits = new TreeSet<Text>();
+    for (int i = 1; i < 10; i++)
+      splits.add(new Text(Utils.getBank((int) (numBanks * .1 * i))));
+    state.getConnector().tableOperations().addSplits((String) state.get("tableName"), splits);
+    log.debug("Added splits " + splits);
+
+    ArrayList<Integer> banks = new ArrayList<Integer>();
+    for (int i = 0; i < numBanks; i++)
+      banks.add(i);
+    // shuffle for case when multiple threads are adding banks
+    Collections.shuffle(banks, (Random) state.get("rand"));
+
     ConditionalWriter cw = (ConditionalWriter) state.get("cw");
 
-    for (int i = 0; i < numBanks; i++) {
+    for (int i : banks) {
       ConditionalMutation m = null;
+      int acceptedCount = 0;
       for (int j = 0; j < numAccts; j++) {
         String cf = Utils.getAccount(j);
         if (m == null) {
@@ -49,7 +70,8 @@ public class Init extends Test {
         m.put(cf, "seq", Utils.getSeq(0));
 
         if (j % 1000 == 0) {
-          cw.write(m);
+          if (cw.write(m).getStatus() == Status.ACCEPTED)
+            acceptedCount++;
           m = null;
         }
 
@@ -57,7 +79,7 @@ public class Init extends Test {
       if (m != null)
         cw.write(m);
 
-      log.debug("Added bank " + Utils.getBank(i));
+      log.debug("Added bank " + Utils.getBank(i) + " " + acceptedCount);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484053a2/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Setup.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Setup.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Setup.java
index 2f39e29..0aa36c4 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Setup.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Setup.java
@@ -22,6 +22,7 @@ import java.util.Random;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
 
@@ -32,11 +33,11 @@ public class Setup extends Test {
     Random rand = new Random();
     state.set("rand", rand);
     
-    int numBanks = Integer.parseInt(props.getProperty("numBanks", "10"));
+    int numBanks = Integer.parseInt(props.getProperty("numBanks", "1000"));
     log.debug("numBanks = " + numBanks);
     state.set("numBanks", numBanks);
 
-    int numAccts = Integer.parseInt(props.getProperty("numAccts", "1000"));
+    int numAccts = Integer.parseInt(props.getProperty("numAccts", "10000"));
     log.debug("numAccts = " + numAccts);
     state.set("numAccts", numAccts);
 
@@ -46,11 +47,12 @@ public class Setup extends Test {
     try {
       state.getConnector().tableOperations().create(tableName);
       log.debug("created table " + tableName);
+      boolean blockCache = rand.nextBoolean();
+      state.getConnector().tableOperations().setProperty(tableName, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), blockCache + "");
+      log.debug("set " + Property.TABLE_BLOCKCACHE_ENABLED.getKey() + " " + blockCache);
     } catch (TableExistsException tee) {}
 
 
-
-
     ConditionalWriter cw = state.getConnector()
         .createConditionalWriter(tableName, new ConditionalWriterConfig().setMaxWriteThreads(1));
     state.set("cw", cw);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484053a2/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
index 70aa3dd..93f0d55 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.test.randomwalk.State;
 import org.apache.accumulo.test.randomwalk.Test;
+import org.apache.commons.math.distribution.ZipfDistributionImpl;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -64,11 +65,17 @@ public class Transfer extends Test {
     Connector conn = state.getConnector();
 
     int numAccts = (Integer) state.get("numAccts");
-    String bank = Utils.getBank(rand.nextInt((Integer) state.get("numBanks")));
-    String acct1 = Utils.getAccount(rand.nextInt(numAccts));
-    String acct2 = Utils.getAccount(rand.nextInt(numAccts));
-    while (acct2.equals(acct1))
+    // note: non integer exponents are slow
+
+    ZipfDistributionImpl zdiBanks = new ZipfDistributionImpl((Integer) state.get("numBanks"), 1);
+    String bank = Utils.getBank(zdiBanks.inverseCumulativeProbability(rand.nextDouble()));
+    ZipfDistributionImpl zdiAccts = new ZipfDistributionImpl(numAccts, 1);
+    String acct1 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
+    String acct2 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
+    while (acct2.equals(acct1)) {
+      // intentionally not using zipf distribution to pick on retry
       acct2 = Utils.getAccount(rand.nextInt(numAccts));
+    }
 
     // TODO document how data should be read when using ConditionalWriter
     Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/484053a2/test/system/randomwalk/conf/modules/Conditional.xml
----------------------------------------------------------------------
diff --git a/test/system/randomwalk/conf/modules/Conditional.xml b/test/system/randomwalk/conf/modules/Conditional.xml
index d33d36c..54ff7ab 100644
--- a/test/system/randomwalk/conf/modules/Conditional.xml
+++ b/test/system/randomwalk/conf/modules/Conditional.xml
@@ -23,17 +23,17 @@
 <init id="ct.Setup"/>
 
 <node id="dummy.ToAll">
-  <edge id="ct.Compact" weight="100"/>
-  <edge id="ct.Flush" weight="100"/>
-  <edge id="ct.Merge" weight="100"/>
-  <edge id="ct.Split" weight="100"/>
+  <edge id="ct.Compact" weight="1"/>
+  <edge id="ct.Flush" weight="1"/>
+  <edge id="ct.Merge" weight="1"/>
+  <edge id="ct.Split" weight="1"/>
   <edge id="ct.Transfer" weight="100000"/>
-  <edge id="ct.Verify" weight="500"/>
+  <edge id="ct.Verify" weight="2"/>
 </node>
 
 <node id="ct.Setup">
-  <property key="numAccts" value="1000"/>
-  <property key="numBanks" value="10"/>
+  <property key="numAccts" value="10000"/>
+  <property key="numBanks" value="1000"/>
   <edge id="ct.Init" weight="1"/>
 </node>