You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/27 22:47:32 UTC

[04/17] accumulo git commit: ACCUMULO-3856 Ensure batchwriter gets closed in updateAndFlush

ACCUMULO-3856 Ensure batchwriter gets closed in updateAndFlush


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/11f108e2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/11f108e2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/11f108e2

Branch: refs/heads/master
Commit: 11f108e252f8358a3ac8b79843f1ebd77bee647e
Parents: 5db68da
Author: Josh Elser <el...@apache.org>
Authored: Wed May 27 16:19:32 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 27 16:19:32 2015 -0400

----------------------------------------------------------------------
 proxy/pom.xml                                   |   5 +
 .../org/apache/accumulo/proxy/ProxyServer.java  |  16 ++-
 .../apache/accumulo/proxy/ProxyServerTest.java  | 114 +++++++++++++++++++
 3 files changed, 131 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/pom.xml
----------------------------------------------------------------------
diff --git a/proxy/pom.xml b/proxy/pom.xml
index d66a329..b75935a 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -94,6 +94,11 @@
       <artifactId>zookeeper</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <profiles>
     <profile>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
index f873010..538fb03 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
@@ -1088,21 +1088,29 @@ public class ProxyServer implements AccumuloProxy.Iface {
   public void updateAndFlush(ByteBuffer login, String tableName, Map<ByteBuffer,List<ColumnUpdate>> cells)
       throws org.apache.accumulo.proxy.thrift.AccumuloException, org.apache.accumulo.proxy.thrift.AccumuloSecurityException,
       org.apache.accumulo.proxy.thrift.TableNotFoundException, org.apache.accumulo.proxy.thrift.MutationsRejectedException, TException {
+    BatchWriterPlusException bwpe = null;
     try {
-      BatchWriterPlusException bwpe = getWriter(login, tableName, null);
+      bwpe = getWriter(login, tableName, null);
       addCellsToWriter(cells, bwpe);
       if (bwpe.exception != null)
         throw bwpe.exception;
       bwpe.writer.flush();
-      bwpe.writer.close();
     } catch (Exception e) {
       handleExceptionMRE(e);
+    } finally {
+      if (null != bwpe) {
+        try {
+          bwpe.writer.close();
+        } catch (MutationsRejectedException e) {
+          handleExceptionMRE(e);
+        }
+      }
     }
   }
 
   private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
 
-  private void addCellsToWriter(Map<ByteBuffer,List<ColumnUpdate>> cells, BatchWriterPlusException bwpe) {
+  void addCellsToWriter(Map<ByteBuffer,List<ColumnUpdate>> cells, BatchWriterPlusException bwpe) {
     if (bwpe.exception != null)
       return;
 
@@ -1217,7 +1225,7 @@ public class ProxyServer implements AccumuloProxy.Iface {
     return bwpe;
   }
 
-  private BatchWriterPlusException getWriter(ByteBuffer login, String tableName, WriterOptions opts) throws Exception {
+  BatchWriterPlusException getWriter(ByteBuffer login, String tableName, WriterOptions opts) throws Exception {
     BatchWriterConfig cfg = new BatchWriterConfig();
     if (opts != null) {
       if (opts.maxMemory != 0)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/11f108e2/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
----------------------------------------------------------------------
diff --git a/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java b/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
new file mode 100644
index 0000000..ed4f313
--- /dev/null
+++ b/proxy/src/test/java/org/apache/accumulo/proxy/ProxyServerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.proxy.ProxyServer.BatchWriterPlusException;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ProxyServerTest {
+
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  @Test
+  public void updateAndFlushClosesWriterOnExceptionFromAddCells() throws Exception {
+    ProxyServer server = EasyMock.createMockBuilder(ProxyServer.class).addMockedMethod("getWriter", ByteBuffer.class, String.class, WriterOptions.class)
+        .addMockedMethod("addCellsToWriter", Map.class, BatchWriterPlusException.class).createMock();
+    BatchWriter writer = EasyMock.createMock(BatchWriter.class);
+    BatchWriterPlusException bwpe = new BatchWriterPlusException();
+    bwpe.writer = writer;
+    MutationsRejectedException mre = EasyMock.createMock(MutationsRejectedException.class);
+
+    final ByteBuffer login = ByteBuffer.wrap("my_login".getBytes(UTF_8));
+    final String tableName = "table1"; 
+    final Map<ByteBuffer,List<ColumnUpdate>> cells = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+
+    EasyMock.expect(server.getWriter(login, tableName, null)).andReturn(bwpe);
+    server.addCellsToWriter(cells, bwpe);
+    EasyMock.expectLastCall();
+
+    // Set the exception
+    bwpe.exception = mre;
+
+    writer.close();
+    EasyMock.expectLastCall();
+
+    EasyMock.replay(server, writer, mre);
+
+    try {
+      server.updateAndFlush(login, tableName, cells);
+      Assert.fail("Expected updateAndFlush to throw an exception");
+    } catch (org.apache.accumulo.proxy.thrift.MutationsRejectedException e) {
+      // pass
+    }
+
+    EasyMock.verify(server, writer, mre);
+  }
+
+  @Test
+  public void updateAndFlushClosesWriterOnExceptionFromFlush() throws Exception {
+    ProxyServer server = EasyMock.createMockBuilder(ProxyServer.class).addMockedMethod("getWriter", ByteBuffer.class, String.class, WriterOptions.class)
+        .addMockedMethod("addCellsToWriter", Map.class, BatchWriterPlusException.class).createMock();
+    BatchWriter writer = EasyMock.createMock(BatchWriter.class);
+    BatchWriterPlusException bwpe = new BatchWriterPlusException();
+    bwpe.writer = writer;
+    MutationsRejectedException mre = EasyMock.createMock(MutationsRejectedException.class);
+
+    final ByteBuffer login = ByteBuffer.wrap("my_login".getBytes(UTF_8));
+    final String tableName = "table1"; 
+    final Map<ByteBuffer,List<ColumnUpdate>> cells = new HashMap<ByteBuffer,List<ColumnUpdate>>();
+
+    EasyMock.expect(server.getWriter(login, tableName, null)).andReturn(bwpe);
+    server.addCellsToWriter(cells, bwpe);
+    EasyMock.expectLastCall();
+
+    // No exception throw adding the cells
+    bwpe.exception = null;
+
+    writer.flush();
+    EasyMock.expectLastCall().andThrow(mre);
+
+    writer.close();
+    EasyMock.expectLastCall();
+
+    EasyMock.replay(server, writer, mre);
+
+    try {
+      server.updateAndFlush(login, tableName, cells);
+      Assert.fail("Expected updateAndFlush to throw an exception");
+    } catch (org.apache.accumulo.proxy.thrift.MutationsRejectedException e) {
+      // pass
+    }
+
+    EasyMock.verify(server, writer, mre);
+  }
+  
+}