You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/08/25 18:09:31 UTC
git commit: ACCUMULO-1628 made InMemoryMap iterator deepCopy work
after delete
Repository: accumulo
Updated Branches:
refs/heads/1.5.2-SNAPSHOT 345332b01 -> 30a0ca3e8
ACCUMULO-1628 made InMemoryMap iterator deepCopy work after delete
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/30a0ca3e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/30a0ca3e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/30a0ca3e
Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 30a0ca3e81e5e27f870f8d5fad40f589a959feff
Parents: 345332b
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 31 14:01:37 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Aug 25 10:53:45 2014 -0400
----------------------------------------------------------------------
.../system/SourceSwitchingIterator.java | 2 +-
.../server/tabletserver/InMemoryMap.java | 62 ++++++++++++-------
.../server/tabletserver/InMemoryMapTest.java | 63 ++++++++++++++++----
3 files changed, 94 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index a4970dc..46d2007 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -76,7 +76,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
}
@Override
- public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index 914cd85..43cf3c1 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.server.tabletserver;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -382,14 +381,18 @@ public class InMemoryMap {
boolean switched = false;
private InterruptibleIterator iter;
- private List<FileSKVIterator> readers;
+ private FileSKVIterator reader;
+ private MemoryDataSource parent;
+ private IteratorEnvironment env;
MemoryDataSource() {
- this(new ArrayList<FileSKVIterator>());
+ this(null, false, null);
}
- public MemoryDataSource(List<FileSKVIterator> readers) {
- this.readers = readers;
+ public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) {
+ this.parent = parent;
+ this.switched = switched;
+ this.env = env;
}
@Override
@@ -408,26 +411,42 @@ public class InMemoryMap {
if (!isCurrent()) {
switched = true;
iter = null;
+ try {
+ // ensure files are referenced even if iterator was never seeked before
+ iterator();
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
}
return this;
}
+ private synchronized FileSKVIterator getReader() throws IOException {
+ if (reader == null) {
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+
+ reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
+ }
+
+ return reader;
+ }
+
@Override
public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
if (iter == null)
if (!switched)
iter = map.skvIterator();
else {
-
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-
- FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
-
- readers.add(reader);
-
- iter = new MemKeyConversionIterator(reader);
+ if (parent == null)
+ iter = new MemKeyConversionIterator(getReader());
+ else
+ synchronized (parent) {
+ // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the
+ // thread deleting an InMemoryMap and scan thread could be switching different deep copies
+ iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
+ }
}
return iter;
@@ -435,7 +454,7 @@ public class InMemoryMap {
@Override
public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
- return new MemoryDataSource(readers);
+ return new MemoryDataSource(parent == null ? this : parent, switched, env);
}
}
@@ -469,13 +488,12 @@ public class InMemoryMap {
synchronized (this) {
if (closed.compareAndSet(false, true)) {
-
- for (FileSKVIterator reader : mds.readers)
- try {
- reader.close();
- } catch (IOException e) {
- log.warn(e, e);
- }
+ try {
+ if (mds.reader != null)
+ mds.reader.close();
+ } catch (IOException e) {
+ log.warn(e, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
index 97c8eec..c905bb8 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -83,7 +83,7 @@ public class InMemoryMapTest extends TestCase {
}
public void test2() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
MemoryIterator ski1 = imm.skvIterator();
mutate(imm, "r1", "foo:cq1", 3, "bar1");
@@ -100,7 +100,7 @@ public class InMemoryMapTest extends TestCase {
}
public void test3() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -125,7 +125,7 @@ public class InMemoryMapTest extends TestCase {
}
public void test4() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -156,7 +156,7 @@ public class InMemoryMapTest extends TestCase {
}
public void test5() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -174,7 +174,7 @@ public class InMemoryMapTest extends TestCase {
ski1.close();
- imm = new InMemoryMap(false, "/tmp");
+ imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -194,7 +194,7 @@ public class InMemoryMapTest extends TestCase {
}
public void test6() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -237,8 +237,51 @@ public class InMemoryMapTest extends TestCase {
ski1.close();
}
+ private void deepCopyAndDelete(int interleaving) throws Exception {
+ // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
+
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
+
+ mutate(imm, "r1", "foo:cq1", 3, "bar1");
+ mutate(imm, "r1", "foo:cq2", 3, "bar2");
+
+ MemoryIterator ski1 = imm.skvIterator();
+
+ if (interleaving == 1)
+ imm.delete(0);
+
+ SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+
+ if (interleaving == 2)
+ imm.delete(0);
+
+ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+ ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ if (interleaving == 3)
+ imm.delete(0);
+
+ ae(dc, "r1", "foo:cq1", 3, "bar1");
+ ae(ski1, "r1", "foo:cq1", 3, "bar1");
+ dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+ if (interleaving == 4)
+ imm.delete(0);
+
+ ae(ski1, "r1", "foo:cq2", 3, "bar2");
+ ae(dc, "r1", "foo:cq1", 3, "bar1");
+ ae(dc, "r1", "foo:cq2", 3, "bar2");
+ assertFalse(dc.hasTop());
+ assertFalse(ski1.hasTop());
+ }
+
+ public void testDeepCopyAndDelete() throws Exception {
+ for (int i = 0; i <= 4; i++)
+ deepCopyAndDelete(i);
+ }
+
public void testBug1() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
for (int i = 0; i < 20; i++) {
mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
@@ -265,7 +308,7 @@ public class InMemoryMapTest extends TestCase {
}
public void testSeekBackWards() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
mutate(imm, "r1", "foo:cq1", 3, "bar1");
mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -283,7 +326,7 @@ public class InMemoryMapTest extends TestCase {
}
public void testDuplicateKey() throws Exception {
- InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
Mutation m = new Mutation(new Text("r1"));
m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
@@ -311,7 +354,7 @@ public class InMemoryMapTest extends TestCase {
for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
final long now = System.currentTimeMillis();
final long counts[] = new long[threads];
- final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+ final InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
ExecutorService e = Executors.newFixedThreadPool(threads);
for (int j = 0; j < threads; j++) {
final int threadId = j;