You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/24 12:33:44 UTC

[hbase] branch master updated: HBASE-21944 Validate put for batch operation

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7d3ce  HBASE-21944 Validate put for batch operation
aa7d3ce is described below

commit aa7d3ce39fa7f477f025343b6d9cca139c462292
Author: zhangduo <zh...@apache.org>
AuthorDate: Sun Feb 24 18:18:33 2019 +0800

    HBASE-21944 Validate put for batch operation
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/client/ConnectionUtils.java       |   3 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |   5 +-
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 185 +++++++++++----------
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  29 +++-
 4 files changed, 126 insertions(+), 96 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 1940948..fea7a1e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -596,8 +596,7 @@ public final class ConnectionUtils {
    * The rules are:
    * <ol>
    * <li>If user set a priority explicitly, then just use it.</li>
-   * <li>For meta table, use {@link HConstants#META_QOS}.</li>
-   * <li>For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
+   * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
    * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
    * </ol>
    * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 789460c..1925c0e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -484,9 +484,6 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
 
   @Override
   public List<CompletableFuture<Void>> put(List<Put> puts) {
-    for (Put put : puts) {
-      validatePut(put, conn.connConf.getMaxKeyValueSize());
-    }
     return voidMutate(puts);
   }
 
@@ -506,6 +503,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
   }
 
   private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
+    actions.stream().filter(action -> action instanceof Put).map(action -> (Put) action)
+      .forEach(put -> validatePut(put, conn.connConf.getMaxKeyValueSize()));
     return conn.callerFactory.batch().table(tableName).actions(actions)
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 89ebf8d..63080b9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -69,7 +69,7 @@ public class TestAsyncTable {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncTable.class);
+    HBaseClassTestRule.forClass(TestAsyncTable.class);
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
@@ -81,6 +81,8 @@ public class TestAsyncTable {
 
   private static byte[] VALUE = Bytes.toBytes("value");
 
+  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
+
   private static AsyncConnection ASYNC_CONN;
 
   @Rule
@@ -107,6 +109,8 @@ public class TestAsyncTable {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
+      MAX_KEY_VALUE_SIZE);
     TEST_UTIL.startMiniCluster(1);
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
@@ -146,6 +150,7 @@ public class TestAsyncTable {
     return Bytes.toBytes(Bytes.toString(base) + "-" + index);
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testSimpleMultiple() throws Exception {
     AsyncTable<?> table = getTable.get();
@@ -153,19 +158,19 @@ public class TestAsyncTable {
     CountDownLatch putLatch = new CountDownLatch(count);
     IntStream.range(0, count).forEach(
       i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
-          .thenAccept(x -> putLatch.countDown()));
+        .thenAccept(x -> putLatch.countDown()));
     putLatch.await();
     BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
     IntStream.range(0, count)
-        .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
-            .thenAccept(x -> existsResp.add(x)));
+      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
+        .thenAccept(x -> existsResp.add(x)));
     for (int i = 0; i < count; i++) {
       assertTrue(existsResp.take());
     }
     BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
     IntStream.range(0, count)
-        .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
-            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
+      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
+        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
     for (int i = 0; i < count; i++) {
       Pair<Integer, Result> pair = getResp.take();
       assertArrayEquals(concat(VALUE, pair.getFirst()),
@@ -176,20 +181,21 @@ public class TestAsyncTable {
       i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
     deleteLatch.await();
     IntStream.range(0, count)
-        .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
-            .thenAccept(x -> existsResp.add(x)));
+      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
+        .thenAccept(x -> existsResp.add(x)));
     for (int i = 0; i < count; i++) {
       assertFalse(existsResp.take());
     }
     IntStream.range(0, count)
-        .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
-            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
+      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
+        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
     for (int i = 0; i < count; i++) {
       Pair<Integer, Result> pair = getResp.take();
       assertTrue(pair.getSecond().isEmpty());
     }
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testIncrement() throws InterruptedException, ExecutionException {
     AsyncTable<?> table = getTable.get();
@@ -197,16 +203,17 @@ public class TestAsyncTable {
     CountDownLatch latch = new CountDownLatch(count);
     AtomicLong sum = new AtomicLong(0L);
     IntStream.range(0, count)
-        .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
-          sum.addAndGet(x);
-          latch.countDown();
-        }));
+      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
+        sum.addAndGet(x);
+        latch.countDown();
+      }));
     latch.await();
     assertEquals(count, Bytes.toLong(
       table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
     assertEquals((1 + count) * count / 2, sum.get());
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testAppend() throws InterruptedException, ExecutionException {
     AsyncTable<?> table = getTable.get();
@@ -214,22 +221,24 @@ public class TestAsyncTable {
     CountDownLatch latch = new CountDownLatch(count);
     char suffix = ':';
     AtomicLong suffixCount = new AtomicLong(0L);
-    IntStream.range(0, count).forEachOrdered(
-      i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
-          .thenAccept(r -> {
-            suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars()
-                .filter(x -> x == suffix).count());
-            latch.countDown();
-          }));
+    IntStream.range(0, count)
+      .forEachOrdered(i -> table
+        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
+        .thenAccept(r -> {
+          suffixCount.addAndGet(
+            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
+          latch.countDown();
+        }));
     latch.await();
     assertEquals((1 + count) * count / 2, suffixCount.get());
     String value = Bytes.toString(
       table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
     int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
-        .sorted().toArray();
+      .sorted().toArray();
     assertArrayEquals(IntStream.range(0, count).toArray(), actual);
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testCheckAndPut() throws InterruptedException, ExecutionException {
     AsyncTable<?> table = getTable.get();
@@ -238,20 +247,21 @@ public class TestAsyncTable {
     int count = 10;
     CountDownLatch latch = new CountDownLatch(count);
     IntStream.range(0, count)
-        .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
-            .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
-              if (x) {
-                successCount.incrementAndGet();
-                successIndex.set(i);
-              }
-              latch.countDown();
-            }));
+      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
+        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          latch.countDown();
+        }));
     latch.await();
     assertEquals(1, successCount.get());
     String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
     assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testCheckAndDelete() throws InterruptedException, ExecutionException {
     AsyncTable<?> table = getTable.get();
@@ -259,24 +269,24 @@ public class TestAsyncTable {
     CountDownLatch putLatch = new CountDownLatch(count + 1);
     table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
     IntStream.range(0, count)
-        .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
-            .thenRun(() -> putLatch.countDown()));
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
     putLatch.await();
 
     AtomicInteger successCount = new AtomicInteger(0);
     AtomicInteger successIndex = new AtomicInteger(-1);
     CountDownLatch deleteLatch = new CountDownLatch(count);
     IntStream.range(0, count)
-        .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
-            .thenDelete(
-              new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
-            .thenAccept(x -> {
-              if (x) {
-                successCount.incrementAndGet();
-                successIndex.set(i);
-              }
-              deleteLatch.countDown();
-            }));
+      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
+        .thenDelete(
+          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          deleteLatch.countDown();
+        }));
     deleteLatch.await();
     assertEquals(1, successCount.get());
     Result result = table.get(new Get(row)).get();
@@ -307,6 +317,7 @@ public class TestAsyncTable {
     assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testCheckAndMutate() throws InterruptedException, ExecutionException {
     AsyncTable<?> table = getTable.get();
@@ -314,8 +325,8 @@ public class TestAsyncTable {
     CountDownLatch putLatch = new CountDownLatch(count + 1);
     table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
     IntStream.range(0, count)
-        .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
-            .thenRun(() -> putLatch.countDown()));
+      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
+        .thenRun(() -> putLatch.countDown()));
     putLatch.await();
 
     AtomicInteger successCount = new AtomicInteger(0);
@@ -331,13 +342,13 @@ public class TestAsyncTable {
         throw new UncheckedIOException(e);
       }
       table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
-          .thenAccept(x -> {
-            if (x) {
-              successCount.incrementAndGet();
-              successIndex.set(i);
-            }
-            mutateLatch.countDown();
-          });
+        .thenAccept(x -> {
+          if (x) {
+            successCount.incrementAndGet();
+            successIndex.set(i);
+          }
+          mutateLatch.countDown();
+        });
     });
     mutateLatch.await();
     assertEquals(1, successCount.get());
@@ -358,57 +369,35 @@ public class TestAsyncTable {
     Put put = new Put(row);
     put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
 
-    boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .ifNotExists()
-      .thenPut(put)
-      .get();
+    boolean ok =
+      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
     assertTrue(ok);
 
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts + 10000))
-      .ifEquals(VALUE)
-      .thenPut(put)
-      .get();
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
+      .ifEquals(VALUE).thenPut(put).get();
     assertFalse(ok);
 
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts))
-      .ifEquals(VALUE)
-      .thenPut(put)
-      .get();
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
+      .ifEquals(VALUE).thenPut(put).get();
     assertTrue(ok);
 
-    RowMutations rm = new RowMutations(row)
-      .add((Mutation) put);
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts + 10000))
-      .ifEquals(VALUE)
-      .thenMutate(rm)
-      .get();
+    RowMutations rm = new RowMutations(row).add((Mutation) put);
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
+      .ifEquals(VALUE).thenMutate(rm).get();
     assertFalse(ok);
 
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts))
-      .ifEquals(VALUE)
-      .thenMutate(rm)
-      .get();
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
+      .ifEquals(VALUE).thenMutate(rm).get();
     assertTrue(ok);
 
-    Delete delete = new Delete(row)
-      .addColumn(FAMILY, QUALIFIER);
+    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
 
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts + 10000))
-      .ifEquals(VALUE)
-      .thenDelete(delete)
-      .get();
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
+      .ifEquals(VALUE).thenDelete(delete).get();
     assertFalse(ok);
 
-    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
-      .timeRange(TimeRange.at(ts))
-      .ifEquals(VALUE)
-      .thenDelete(delete)
-      .get();
+    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
+      .ifEquals(VALUE).thenDelete(delete).get();
     assertTrue(ok);
   }
 
@@ -424,4 +413,22 @@ public class TestAsyncTable {
       assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
     }
   }
+
+  @Test
+  public void testInvalidPut() {
+    try {
+      getTable.get().put(new Put(Bytes.toBytes(0)));
+      fail("Should fail since the put does not contain any cells");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("No columns to insert"));
+    }
+
+    try {
+      getTable.get()
+        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
+      fail("Should fail since the put exceeds the max key value size");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("KeyValue size too large"));
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index 717eb24..42e61d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -87,6 +88,8 @@ public class TestAsyncTableBatch {
 
   private static byte[][] SPLIT_KEYS;
 
+  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
+
   @Parameter(0)
   public String tableType;
 
@@ -111,6 +114,8 @@ public class TestAsyncTableBatch {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
+      MAX_KEY_VALUE_SIZE);
     TEST_UTIL.startMiniCluster(3);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
@@ -224,8 +229,8 @@ public class TestAsyncTableBatch {
     actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
     actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
     RowMutations rm = new RowMutations(Bytes.toBytes(5));
-    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
-    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
+    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
+    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
     actions.add(rm);
     actions.add(new Get(Bytes.toBytes(6)));
 
@@ -308,4 +313,24 @@ public class TestAsyncTableBatch {
     assertEquals("good",
       Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
   }
+
+  @Test
+  public void testInvalidPut() {
+    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
+    try {
+      table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0))));
+      fail("Should fail since the put does not contain any cells");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("No columns to insert"));
+    }
+
+    try {
+      table.batch(
+        Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
+          new Delete(Bytes.toBytes(0))));
+      fail("Should fail since the put exceeds the max key value size");
+    } catch (IllegalArgumentException e) {
+      assertThat(e.getMessage(), containsString("KeyValue size too large"));
+    }
+  }
 }