You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/02/13 20:39:18 UTC
svn commit: r1445880 - in /accumulo/branches/1.4/src/server/src:
main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
Author: ecn
Date: Wed Feb 13 19:39:18 2013
New Revision: 1445880
URL: http://svn.apache.org/r1445880
Log:
ACCUMULO-1062 serialize writes to ensure counts, rather than serializing the return using check/notify for counts; merge to 1.4 branch
Modified:
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java?rev=1445880&r1=1445879&r2=1445880&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java Wed Feb 13 19:39:18 2013
@@ -260,6 +260,8 @@ public class InMemoryMap {
private AtomicInteger nextMutationCount = new AtomicInteger(1);
private AtomicInteger mutationCount = new AtomicInteger(0);
+
+ private Object writeSerializer = new Object();
/**
* Applies changes to a row in the InMemoryMap
@@ -267,25 +269,19 @@ public class InMemoryMap {
*/
public void mutate(List<Mutation> mutations) {
int mc = nextMutationCount.getAndAdd(mutations.size());
- try {
- map.mutate(mutations, mc);
- } finally {
- synchronized (this) {
- // Can not update mutationCount while writes that started before
- // are in progress, this would cause partial mutations to be seen.
- // Also, can not continue until mutation count is updated, because
- // a read may not see a successful write. Therefore writes must
- // wait for writes that started before to finish.
-
- while (mutationCount.get() != mc - 1) {
- try {
- wait();
- } catch (InterruptedException ex) {
- // ignored
- }
- }
+ int numKVs = 0;
+ // Can not update mutationCount while writes that started before
+ // are in progress, this would cause partial mutations to be seen.
+ // Also, can not continue until mutation count is updated, because
+ // a read may not see a successful write. Therefore writes must
+ // wait for writes that started before to finish.
+ //
+ // using separate lock from this map, to allow read/write in parallel
+ synchronized (writeSerializer ) {
+ try {
+ map.mutate(mutations, mc);
+ } finally {
mutationCount.set(mc + mutations.size() - 1);
- notifyAll();
}
}
}
Modified: accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java?rev=1445880&r1=1445879&r2=1445880&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java (original)
+++ accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java Wed Feb 13 19:39:18 2013
@@ -19,6 +19,10 @@ package org.apache.accumulo.server.table
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
@@ -38,6 +42,7 @@ import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Before;
+import org.junit.Test;
public class InMemoryMapTest extends TestCase {
@@ -259,4 +264,54 @@ public class InMemoryMapTest extends Tes
ski1.close();
}
+
+ private static final Logger log = Logger.getLogger(InMemoryMapTest.class);
+
+ static long sum(long[] counts) {
+ long result = 0;
+ for (int i = 0; i < counts.length; i++)
+ result += counts[i];
+ return result;
+ }
+
+ @Test
+ public void testParallelWriteSpeed() throws InterruptedException {
+ List<Double> timings = new ArrayList<Double>();
+ for (int threads: new int[]{1, 2, 16, 64, 256, 2048} ) {
+ final long now = System.currentTimeMillis();
+ final long counts[] = new long[threads];
+ final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ ExecutorService e = Executors.newFixedThreadPool(threads);
+ for (int j = 0; j < threads; j++) {
+ final int threadId = j;
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ while (System.currentTimeMillis() - now < 1000) {
+ for (int k = 0; k < 1000; k++) {
+ Mutation m = new Mutation("row");
+ m.put("cf", "cq", new Value("v".getBytes()));
+ List<Mutation> mutations = Collections.singletonList(m);
+ imm.mutate(mutations);
+ counts[threadId]++;
+ }
+ }
+ }
+ });
+ }
+ e.shutdown();
+ e.awaitTermination(10, TimeUnit.SECONDS);
+ imm.delete(10000);
+ double mutationsPerSecond = sum(counts)/((System.currentTimeMillis() - now)/1000.);
+ timings.add(mutationsPerSecond);
+ log.info(String.format("%.1f mutations per second with %d threads", mutationsPerSecond, threads));
+ }
+ // verify that more threads doesn't go a lot faster, or a lot slower than one thread
+ for (int i = 0; i < timings.size(); i++) {
+ double ratioFirst = timings.get(0) / timings.get(i);
+ assertTrue(ratioFirst < 2);
+ assertTrue(ratioFirst > 0.5);
+ }
+ }
+
}