You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/17 14:58:20 UTC

[06/10] git commit: ACCUMULO-1000 added close method to conditional writer

ACCUMULO-1000 added close method to conditional writer


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7faaf11e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7faaf11e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7faaf11e

Branch: refs/heads/ACCUMULO-1000
Commit: 7faaf11ef42becef7f40d4b68135ee6d8fccafdb
Parents: 04457eb
Author: Keith Turner <kt...@apache.org>
Authored: Tue Jul 16 09:19:14 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jul 16 09:19:14 2013 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/ConditionalWriter.java |  1 +
 .../core/client/impl/ConditionalWriterImpl.java | 31 +++++++++++++++-----
 .../accumulo/test/FaultyConditionalWriter.java  |  5 ++++
 .../accumulo/test/ConditionalWriterTest.java    | 13 ++++++++
 4 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index a23abcf..068e3e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -94,4 +94,5 @@ public interface ConditionalWriter {
    */
   public long getTimeout(TimeUnit timeUnit);
 
+  public void close();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 04a2753..3c6ac85 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -25,13 +25,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -86,9 +86,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
 
   private Map<String,BlockingQueue<TabletServerMutations>> serverQueues;
   private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>();
-  private ScheduledExecutorService threadPool;
+  private ScheduledThreadPoolExecutor threadPool;
   
-  private static class RQIterator implements Iterator<Result> {
+  private class RQIterator implements Iterator<Result> {
     
     private BlockingQueue<Result> rq;
     private int count;
@@ -105,9 +105,20 @@ class ConditionalWriterImpl implements ConditionalWriter {
     
     @Override
     public Result next() {
+      if (count <= 0)
+        throw new NoSuchElementException();
+
       try {
         // TODO maybe call drainTo after take to get a batch efficiently
-        Result result = rq.take();
+        Result result = rq.poll(1, TimeUnit.SECONDS);
+        while (result == null) {
+          
+          if (threadPool.isShutdown()) {
+            throw new NoSuchElementException("ConditionalWriter closed");
+          }
+          
+          result = rq.poll(1, TimeUnit.SECONDS);
+        }
         count--;
         return result;
       } catch (InterruptedException e) {
@@ -245,7 +256,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.auths = authorizations;
     this.ve = new VisibilityEvaluator(authorizations);
     // TODO make configurable
-    this.threadPool = Executors.newScheduledThreadPool(3);
+    this.threadPool = new ScheduledThreadPoolExecutor(3);
+    this.threadPool.setMaximumPoolSize(3);
     this.locator = TabletLocator.getLocator(instance, new Text(tableId));
     this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations>>();
     
@@ -266,8 +278,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
     };
     
     threadPool.scheduleAtFixedRate(failureHandler, 100, 100, TimeUnit.MILLISECONDS);
-    
-    // TODO need to shutdown thread pool
   }
 
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
@@ -500,4 +510,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
     throw new UnsupportedOperationException();
   }
   
+  @Override
+  public void close() {
+    threadPool.shutdownNow();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
index 2874419..61c033a 100644
--- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -82,4 +82,9 @@ public class FaultyConditionalWriter implements ConditionalWriter {
     return cw.getTimeout(timeUnit);
   }
   
+  @Override
+  public void close() {
+    cw.close();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 541f8dd..4bc7117 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -318,6 +318,8 @@ public class ConditionalWriterTest {
     cm7.put("name", "first", cva, "john");
     cm7.put("tx", "seq", cva, "1");
     Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
+    
+    cw.close();
   }
   
   @Test
@@ -347,6 +349,8 @@ public class ConditionalWriterTest {
     
     Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
     Assert.assertTrue(scanner.iterator().hasNext());
+    
+    cw.close();
 
   }
 
@@ -398,6 +402,8 @@ public class ConditionalWriterTest {
     
     // TODO test conditions with different iterators
     // TODO test w/ table that has iterators configured
+    
+    cw.close();
   }
 
   @Test
@@ -502,6 +508,8 @@ public class ConditionalWriterTest {
     scanner.clearColumns();
     scanner.fetchColumn(new Text("name"), new Text("last"));
     Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+    
+    cw.close();
   }
   
   @Test
@@ -575,6 +583,8 @@ public class ConditionalWriterTest {
     }
     
     Assert.assertEquals(num, count);
+    
+    cw.close();
   }
   
   @Test
@@ -656,6 +666,7 @@ public class ConditionalWriterTest {
     Assert.assertEquals("1", iter.next().getValue().toString());
     Assert.assertFalse(iter.hasNext());
 
+    cw.close();
   }
   
   @Test
@@ -707,6 +718,8 @@ public class ConditionalWriterTest {
     Assert.assertEquals(1, accepted);
     Assert.assertEquals(2, rejected);
     Assert.assertEquals(3, total);
+    
+    cw.close();
   }
 
   private SortedSet<Text> nss(String... splits) {