You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/27 22:47:32 UTC
[04/17] accumulo git commit: ACCUMULO-3856 Ensure batchwriter gets
closed in updateAndFlush
ACCUMULO-3856 Ensure batchwriter gets closed in updateAndFlush
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/11f108e2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/11f108e2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/11f108e2
Branch: refs/heads/master
Commit: 11f108e252f8358a3ac8b79843f1ebd77bee647e
Parents: 5db68da
Author: Josh Elser <el...@apache.org>
Authored: Wed May 27 16:19:32 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 27 16:19:32 2015 -0400
----------------------------------------------------------------------
proxy/pom.xml | 5 +
.../org/apache/accumulo/proxy/ProxyServer.java | 16 ++-
.../apache/accumulo/proxy/ProxyServerTest.java | 114 +++++++++++++++++++
3 files changed, 131 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/pom.xml
----------------------------------------------------------------------
diff --git a/proxy/pom.xml b/proxy/pom.xml
index d66a329..b75935a 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -94,6 +94,11 @@
<artifactId>zookeeper</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index f873010..538fb03 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -1088,21 +1088,29 @@ public class ProxyServer implements AccumuloProxy.Iface {
public void updateAndFlush(ByteBuffer login, String tableName, Map<ByteBuffer,List<ColumnUpdate>> cells)
throws org.apache.accumulo.proxy.thrift.AccumuloException, org.apache.accumulo.proxy.thrift.AccumuloSecurityException,
org.apache.accumulo.proxy.thrift.TableNotFoundException, org.apache.accumulo.proxy.thrift.MutationsRejectedException, TException {
+ BatchWriterPlusException bwpe = null;
try {
- BatchWriterPlusException bwpe = getWriter(login, tableName, null);
+ bwpe = getWriter(login, tableName, null);
addCellsToWriter(cells, bwpe);
if (bwpe.exception != null)
throw bwpe.exception;
bwpe.writer.flush();
- bwpe.writer.close();
} catch (Exception e) {
handleExceptionMRE(e);
+ } finally {
+ if (null != bwpe) {
+ try {
+ bwpe.writer.close();
+ } catch (MutationsRejectedException e) {
+ handleExceptionMRE(e);
+ }
+ }
}
}
private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
- private void addCellsToWriter(Map<ByteBuffer,List<ColumnUpdate>> cells, BatchWriterPlusException bwpe) {
+ void addCellsToWriter(Map<ByteBuffer,List<ColumnUpdate>> cells, BatchWriterPlusException bwpe) {
if (bwpe.exception != null)
return;
@@ -1217,7 +1225,7 @@ public class ProxyServer implements AccumuloProxy.Iface {
return bwpe;
}
- private BatchWriterPlusException getWriter(ByteBuffer login, String tableName, WriterOptions opts) throws Exception {
+ BatchWriterPlusException getWriter(ByteBuffer login, String tableName, WriterOptions opts) throws Exception {
BatchWriterConfig cfg = new BatchWriterConfig();
if (opts != null) {
if (opts.maxMemory != 0)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
----------------------------------------------------------------------
diff --git a/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java b/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
new file mode 100644
index 0000000..ed4f313
--- /dev/null
+++ b/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.proxy.ProxyServer.BatchWriterPlusException;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ProxyServerTest {
+
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ @Test
+ public void updateAndFlushClosesWriterOnExceptionFromAddCells() throws Exception {
+ ProxyServer server = EasyMock.createMockBuilder(ProxyServer.class).addMockedMethod("getWriter", ByteBuffer.class, String.class, WriterOptions.class)
+ .addMockedMethod("addCellsToWriter", Map.class, BatchWriterPlusException.class).createMock();
+ BatchWriter writer = EasyMock.createMock(BatchWriter.class);
+ BatchWriterPlusException bwpe = new BatchWriterPlusException();
+ bwpe.writer = writer;
+ MutationsRejectedException mre = EasyMock.createMock(MutationsRejectedException.class);
+
+ final ByteBuffer login = ByteBuffer.wrap("my_login".getBytes(UTF_8));
+ final String tableName = "table1";
+ final Map<ByteBuffer,List<ColumnUpdate>> cells = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+
+ EasyMock.expect(server.getWriter(login, tableName, null)).andReturn(bwpe);
+ server.addCellsToWriter(cells, bwpe);
+ EasyMock.expectLastCall();
+
+ // Set the exception
+ bwpe.exception = mre;
+
+ writer.close();
+ EasyMock.expectLastCall();
+
+ EasyMock.replay(server, writer, mre);
+
+ try {
+ server.updateAndFlush(login, tableName, cells);
+ Assert.fail("Expected updateAndFlush to throw an exception");
+ } catch (org.apache.accumulo.proxy.thrift.MutationsRejectedException e) {
+ // pass
+ }
+
+ EasyMock.verify(server, writer, mre);
+ }
+
+ @Test
+ public void updateAndFlushClosesWriterOnExceptionFromFlush() throws Exception {
+ ProxyServer server = EasyMock.createMockBuilder(ProxyServer.class).addMockedMethod("getWriter", ByteBuffer.class, String.class, WriterOptions.class)
+ .addMockedMethod("addCellsToWriter", Map.class, BatchWriterPlusException.class).createMock();
+ BatchWriter writer = EasyMock.createMock(BatchWriter.class);
+ BatchWriterPlusException bwpe = new BatchWriterPlusException();
+ bwpe.writer = writer;
+ MutationsRejectedException mre = EasyMock.createMock(MutationsRejectedException.class);
+
+ final ByteBuffer login = ByteBuffer.wrap("my_login".getBytes(UTF_8));
+ final String tableName = "table1";
+ final Map<ByteBuffer,List<ColumnUpdate>> cells = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+
+ EasyMock.expect(server.getWriter(login, tableName, null)).andReturn(bwpe);
+ server.addCellsToWriter(cells, bwpe);
+ EasyMock.expectLastCall();
+
+ // No exception throw adding the cells
+ bwpe.exception = null;
+
+ writer.flush();
+ EasyMock.expectLastCall().andThrow(mre);
+
+ writer.close();
+ EasyMock.expectLastCall();
+
+ EasyMock.replay(server, writer, mre);
+
+ try {
+ server.updateAndFlush(login, tableName, cells);
+ Assert.fail("Expected updateAndFlush to throw an exception");
+ } catch (org.apache.accumulo.proxy.thrift.MutationsRejectedException e) {
+ // pass
+ }
+
+ EasyMock.verify(server, writer, mre);
+ }
+
+}