You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/02/13 20:39:18 UTC

svn commit: r1445880 - in /accumulo/branches/1.4/src/server/src: main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java

Author: ecn
Date: Wed Feb 13 19:39:18 2013
New Revision: 1445880

URL: http://svn.apache.org/r1445880
Log:
ACCUMULO-1062 serialize writes to ensure counts, rather than serializing the return using check/notify for counts; merge to 1.4 branch

Modified:
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
    accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1445880&r1=1445879&r2=1445880&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java Wed Feb 13 19:39:18 2013
@@ -260,6 +260,8 @@ public class InMemoryMap {
   
   private AtomicInteger nextMutationCount = new AtomicInteger(1);
   private AtomicInteger mutationCount = new AtomicInteger(0);
+
+  private Object writeSerializer = new Object();
   
   /**
    * Applies changes to a row in the InMemoryMap
@@ -267,25 +269,19 @@ public class InMemoryMap {
    */
   public void mutate(List<Mutation> mutations) {
     int mc = nextMutationCount.getAndAdd(mutations.size());
-    try {
-      map.mutate(mutations, mc);
-    } finally {
-      synchronized (this) {
-        // Can not update mutationCount while writes that started before
-        // are in progress, this would cause partial mutations to be seen.
-        // Also, can not continue until mutation count is updated, because
-        // a read may not see a successful write. Therefore writes must
-        // wait for writes that started before to finish.
-        
-        while (mutationCount.get() != mc - 1) {
-          try {
-            wait();
-          } catch (InterruptedException ex) {
-            // ignored
-          }
-        }
+    int numKVs = 0;
+    // Can not update mutationCount while writes that started before
+    // are in progress, this would cause partial mutations to be seen.
+    // Also, can not continue until mutation count is updated, because
+    // a read may not see a successful write. Therefore writes must
+    // wait for writes that started before to finish.
+    //
+    // using separate lock from this map, to allow read/write in parallel
+    synchronized (writeSerializer ) {
+      try {
+        map.mutate(mutations, mc);
+      } finally {
         mutationCount.set(mc + mutations.size() - 1);
-        notifyAll();
       }
     }
   }

Modified: accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java?rev=1445880&r1=1445879&r2=1445880&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java (original)
+++ accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java Wed Feb 13 19:39:18 2013
@@ -19,6 +19,10 @@ package org.apache.accumulo.server.table
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
@@ -38,6 +42,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.Before;
+import org.junit.Test;
 
 public class InMemoryMapTest extends TestCase {
   
@@ -259,4 +264,54 @@ public class InMemoryMapTest extends Tes
     
     ski1.close();
   }
+  
+  private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
+
+  static long sum(long[] counts) {
+    long result = 0;
+    for (int i = 0; i < counts.length; i++) 
+      result  += counts[i];
+    return result;
+  }
+  
+  @Test
+  public void testParallelWriteSpeed() throws InterruptedException {
+    List<Double> timings = new ArrayList<Double>();
+    for (int threads: new int[]{1, 2, 16, 64, 256, 2048} ) {
+      final long now = System.currentTimeMillis();
+      final long counts[] = new long[threads];
+      final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+      ExecutorService e = Executors.newFixedThreadPool(threads);
+      for (int j = 0; j < threads; j++) {
+        final int threadId = j;
+        e.execute(new Runnable() {
+          @Override
+          public void run() {
+            while (System.currentTimeMillis() - now < 1000) {
+              for (int k = 0; k < 1000; k++) {
+                Mutation m = new Mutation("row");
+                m.put("cf", "cq", new Value("v".getBytes()));
+                List<Mutation> mutations = Collections.singletonList(m);
+                imm.mutate(mutations);
+                counts[threadId]++;
+              }
+            }
+          }
+        });
+      }
+      e.shutdown();
+      e.awaitTermination(10, TimeUnit.SECONDS);
+      imm.delete(10000);
+      double mutationsPerSecond = sum(counts)/((System.currentTimeMillis() - now)/1000.);
+      timings.add(mutationsPerSecond);
+      log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond, threads));
+    }
+    // verify that more threads doesn't go a lot faster, or a lot slower than one thread
+    for (int i = 0; i < timings.size(); i++) {
+      double ratioFirst = timings.get(0) / timings.get(i); 
+      assertTrue(ratioFirst < 2);
+      assertTrue(ratioFirst > 0.5);
+    }
+  }
+
 }