You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/07/17 14:58:20 UTC
[06/10] git commit: ACCUMULO-1000 added close method to conditional
writer
ACCUMULO-1000 added close method to conditional writer
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7faaf11e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7faaf11e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7faaf11e
Branch: refs/heads/ACCUMULO-1000
Commit: 7faaf11ef42becef7f40d4b68135ee6d8fccafdb
Parents: 04457eb
Author: Keith Turner <kt...@apache.org>
Authored: Tue Jul 16 09:19:14 2013 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jul 16 09:19:14 2013 -0400
----------------------------------------------------------------------
.../accumulo/core/client/ConditionalWriter.java | 1 +
.../core/client/impl/ConditionalWriterImpl.java | 31 +++++++++++++++-----
.../accumulo/test/FaultyConditionalWriter.java | 5 ++++
.../accumulo/test/ConditionalWriterTest.java | 13 ++++++++
4 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index a23abcf..068e3e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -94,4 +94,5 @@ public interface ConditionalWriter {
*/
public long getTimeout(TimeUnit timeUnit);
+ public void close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 04a2753..3c6ac85 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -25,13 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
@@ -86,9 +86,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
private Map<String,BlockingQueue<TabletServerMutations>> serverQueues;
private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>();
- private ScheduledExecutorService threadPool;
+ private ScheduledThreadPoolExecutor threadPool;
- private static class RQIterator implements Iterator<Result> {
+ private class RQIterator implements Iterator<Result> {
private BlockingQueue<Result> rq;
private int count;
@@ -105,9 +105,20 @@ class ConditionalWriterImpl implements ConditionalWriter {
@Override
public Result next() {
+ if (count <= 0)
+ throw new NoSuchElementException();
+
try {
// TODO maybe call drainTo after take to get a batch efficiently
- Result result = rq.take();
+ Result result = rq.poll(1, TimeUnit.SECONDS);
+ while (result == null) {
+
+ if (threadPool.isShutdown()) {
+ throw new NoSuchElementException("ConditionalWriter closed");
+ }
+
+ result = rq.poll(1, TimeUnit.SECONDS);
+ }
count--;
return result;
} catch (InterruptedException e) {
@@ -245,7 +256,8 @@ class ConditionalWriterImpl implements ConditionalWriter {
this.auths = authorizations;
this.ve = new VisibilityEvaluator(authorizations);
// TODO make configurable
- this.threadPool = Executors.newScheduledThreadPool(3);
+ this.threadPool = new ScheduledThreadPoolExecutor(3);
+ this.threadPool.setMaximumPoolSize(3);
this.locator = TabletLocator.getLocator(instance, new Text(tableId));
this.serverQueues = new HashMap<String,BlockingQueue<TabletServerMutations>>();
@@ -266,8 +278,6 @@ class ConditionalWriterImpl implements ConditionalWriter {
};
threadPool.scheduleAtFixedRate(failureHandler, 100, 100, TimeUnit.MILLISECONDS);
-
- // TODO need to shutdown thread pool
}
public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
@@ -500,4 +510,9 @@ class ConditionalWriterImpl implements ConditionalWriter {
throw new UnsupportedOperationException();
}
+ @Override
+ public void close() {
+ threadPool.shutdownNow();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
index 2874419..61c033a 100644
--- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
@@ -82,4 +82,9 @@ public class FaultyConditionalWriter implements ConditionalWriter {
return cw.getTimeout(timeUnit);
}
+ @Override
+ public void close() {
+ cw.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7faaf11e/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
index 541f8dd..4bc7117 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
@@ -318,6 +318,8 @@ public class ConditionalWriterTest {
cm7.put("name", "first", cva, "john");
cm7.put("tx", "seq", cva, "1");
Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
+
+ cw.close();
}
@Test
@@ -347,6 +349,8 @@ public class ConditionalWriterTest {
Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
Assert.assertTrue(scanner.iterator().hasNext());
+
+ cw.close();
}
@@ -398,6 +402,8 @@ public class ConditionalWriterTest {
// TODO test conditions with different iterators
// TODO test w/ table that has iterators configured
+
+ cw.close();
}
@Test
@@ -502,6 +508,8 @@ public class ConditionalWriterTest {
scanner.clearColumns();
scanner.fetchColumn(new Text("name"), new Text("last"));
Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+
+ cw.close();
}
@Test
@@ -575,6 +583,8 @@ public class ConditionalWriterTest {
}
Assert.assertEquals(num, count);
+
+ cw.close();
}
@Test
@@ -656,6 +666,7 @@ public class ConditionalWriterTest {
Assert.assertEquals("1", iter.next().getValue().toString());
Assert.assertFalse(iter.hasNext());
+ cw.close();
}
@Test
@@ -707,6 +718,8 @@ public class ConditionalWriterTest {
Assert.assertEquals(1, accepted);
Assert.assertEquals(2, rejected);
Assert.assertEquals(3, total);
+
+ cw.close();
}
private SortedSet<Text> nss(String... splits) {