You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/08/25 18:09:31 UTC

git commit: ACCUMULO-1628 made InMemoryMap iterator deepCopy work after delete

Repository: accumulo
Updated Branches:
  refs/heads/1.5.2-SNAPSHOT 345332b01 -> 30a0ca3e8


ACCUMULO-1628 made InMemoryMap iterator deepCopy work after delete


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/30a0ca3e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/30a0ca3e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/30a0ca3e

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 30a0ca3e81e5e27f870f8d5fad40f589a959feff
Parents: 345332b
Author: Keith Turner <kt...@apache.org>
Authored: Wed Jul 31 14:01:37 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Aug 25 10:53:45 2014 -0400

----------------------------------------------------------------------
 .../system/SourceSwitchingIterator.java         |  2 +-
 .../server/tabletserver/InMemoryMap.java        | 62 ++++++++++++-------
 .../server/tabletserver/InMemoryMapTest.java    | 63 ++++++++++++++++----
 3 files changed, 94 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
index a4970dc..46d2007 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java
@@ -76,7 +76,7 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value
   }
   
   @Override
-  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+  public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
     return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
index 914cd85..43cf3c1 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/InMemoryMap.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.server.tabletserver;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -382,14 +381,18 @@ public class InMemoryMap {
     
     boolean switched = false;
     private InterruptibleIterator iter;
-    private List<FileSKVIterator> readers;
+    private FileSKVIterator reader;
+    private MemoryDataSource parent;
+    private IteratorEnvironment env;
     
     MemoryDataSource() {
-      this(new ArrayList<FileSKVIterator>());
+      this(null, false, null);
     }
     
-    public MemoryDataSource(List<FileSKVIterator> readers) {
-      this.readers = readers;
+    public MemoryDataSource(MemoryDataSource parent, boolean switched, IteratorEnvironment env) {
+      this.parent = parent;
+      this.switched = switched;
+      this.env = env;
     }
     
     @Override
@@ -408,26 +411,42 @@ public class InMemoryMap {
       if (!isCurrent()) {
         switched = true;
         iter = null;
+        try {
+          // ensure files are referenced even if iterator was never seeked before
+          iterator();
+        } catch (IOException e) {
+          throw new RuntimeException();
+        }
       }
       
       return this;
     }
     
+    private synchronized FileSKVIterator getReader() throws IOException {
+      if (reader == null) {
+        Configuration conf = CachedConfiguration.getInstance();
+        FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+        
+        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
+      }
+
+      return reader;
+    }
+
     @Override
     public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
       if (iter == null)
         if (!switched)
           iter = map.skvIterator();
         else {
-          
-          Configuration conf = CachedConfiguration.getInstance();
-          FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf));
-          
-          FileSKVIterator reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, ServerConfiguration.getSiteConfiguration());
-
-          readers.add(reader);
-          
-          iter = new MemKeyConversionIterator(reader);
+          if (parent == null)
+            iter = new MemKeyConversionIterator(getReader());
+          else
+            synchronized (parent) {
+              // synchronize deep copy operation on parent, this prevents multiple threads from deep copying the rfile shared from parent its possible that the
+              // thread deleting an InMemoryMap and scan thread could be switching different deep copies
+              iter = new MemKeyConversionIterator(parent.getReader().deepCopy(env));
+            }
         }
       
       return iter;
@@ -435,7 +454,7 @@ public class InMemoryMap {
     
     @Override
     public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
-      return new MemoryDataSource(readers);
+      return new MemoryDataSource(parent == null ? this : parent, switched, env);
     }
     
   }
@@ -469,13 +488,12 @@ public class InMemoryMap {
       
       synchronized (this) {
         if (closed.compareAndSet(false, true)) {
-          
-          for (FileSKVIterator reader : mds.readers)
-            try {
-              reader.close();
-            } catch (IOException e) {
-              log.warn(e, e);
-            }
+          try {
+            if (mds.reader != null)
+              mds.reader.close();
+          } catch (IOException e) {
+            log.warn(e, e);
+          }
         }
       }
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/30a0ca3e/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
index 97c8eec..c905bb8 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/InMemoryMapTest.java
@@ -83,7 +83,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void test2() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     MemoryIterator ski1 = imm.skvIterator();
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
@@ -100,7 +100,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void test3() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -125,7 +125,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void test4() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -156,7 +156,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void test5() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq1", 3, "bar2");
@@ -174,7 +174,7 @@ public class InMemoryMapTest extends TestCase {
     
     ski1.close();
     
-    imm = new InMemoryMap(false, "/tmp");
+    imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -194,7 +194,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void test6() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -237,8 +237,51 @@ public class InMemoryMapTest extends TestCase {
     ski1.close();
   }
   
+  private void deepCopyAndDelete(int interleaving) throws Exception {
+    // interleaving == 0 intentionally omitted, this runs the test w/o deleting in mem map
+
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
+    
+    mutate(imm, "r1", "foo:cq1", 3, "bar1");
+    mutate(imm, "r1", "foo:cq2", 3, "bar2");
+    
+    MemoryIterator ski1 = imm.skvIterator();
+    
+    if (interleaving == 1)
+      imm.delete(0);
+    
+    SortedKeyValueIterator<Key,Value> dc = ski1.deepCopy(null);
+
+    if (interleaving == 2)
+      imm.delete(0);
+
+    dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+    ski1.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+    if (interleaving == 3)
+      imm.delete(0);
+
+    ae(dc, "r1", "foo:cq1", 3, "bar1");
+    ae(ski1, "r1", "foo:cq1", 3, "bar1");
+    dc.seek(new Range(), LocalityGroupUtil.EMPTY_CF_SET, false);
+
+    if (interleaving == 4)
+      imm.delete(0);
+
+    ae(ski1, "r1", "foo:cq2", 3, "bar2");
+    ae(dc, "r1", "foo:cq1", 3, "bar1");
+    ae(dc, "r1", "foo:cq2", 3, "bar2");
+    assertFalse(dc.hasTop());
+    assertFalse(ski1.hasTop());
+  }
+
+  public void testDeepCopyAndDelete() throws Exception {
+    for (int i = 0; i <= 4; i++)
+      deepCopyAndDelete(i);
+  }
+
   public void testBug1() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     for (int i = 0; i < 20; i++) {
       mutate(imm, "r1", "foo:cq" + i, 3, "bar" + i);
@@ -265,7 +308,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void testSeekBackWards() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     mutate(imm, "r1", "foo:cq1", 3, "bar1");
     mutate(imm, "r1", "foo:cq2", 3, "bar2");
@@ -283,7 +326,7 @@ public class InMemoryMapTest extends TestCase {
   }
   
   public void testDuplicateKey() throws Exception {
-    InMemoryMap imm = new InMemoryMap(false, "/tmp");
+    InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
     
     Mutation m = new Mutation(new Text("r1"));
     m.put(new Text("foo"), new Text("cq"), 3, new Value("v1".getBytes()));
@@ -311,7 +354,7 @@ public class InMemoryMapTest extends TestCase {
     for (int threads : new int[] {1, 2, 16, /* 64, 256 */}) {
       final long now = System.currentTimeMillis();
       final long counts[] = new long[threads];
-      final InMemoryMap imm = new InMemoryMap(false, "/tmp");
+      final InMemoryMap imm = new InMemoryMap(false, System.getProperty("user.dir") + "/target");
       ExecutorService e = Executors.newFixedThreadPool(threads);
       for (int j = 0; j < threads; j++) {
         final int threadId = j;