You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/02/02 18:25:12 UTC
[accumulo] 02/06: Revert "ACCUMULO-4778 Cache table name to id map
(#364)"
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 505d999b6944d8a91201257a4891a1ced6f64e25
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Feb 2 12:12:25 2018 -0500
Revert "ACCUMULO-4778 Cache table name to id map (#364)"
This reverts commit 5adeb4b7ed561a0bcea1a1def17835310831662f.
---
.../client/impl/MultiTableBatchWriterImpl.java | 79 ++++++++++++-
.../apache/accumulo/core/client/impl/TableMap.java | 100 -----------------
.../apache/accumulo/core/client/impl/Tables.java | 123 +++++++++++----------
.../accumulo/test/MultiTableBatchWriterIT.java | 119 +++++++++++++++++++-
4 files changed, 259 insertions(+), 162 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index e7a6d73..f5e1fa0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@ -19,26 +19,37 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.state.tables.TableState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
+ public static final long DEFAULT_CACHE_TIME = 200;
+ public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.MILLISECONDS;
private static final Logger log = LoggerFactory.getLogger(MultiTableBatchWriterImpl.class);
private AtomicBoolean closed;
+ private AtomicLong cacheLastState;
private class TableBatchWriter implements BatchWriter {
@@ -71,17 +82,49 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
}
+ /**
+ * CacheLoader which will look up the internal table ID for a given table name.
+ */
+ private class TableNameToIdLoader extends CacheLoader<String,String> {
+
+ @Override
+ public String load(String tableName) throws Exception {
+ Instance instance = context.getInstance();
+ String tableId = Tables.getNameToIdMap(instance).get(tableName);
+
+ if (tableId == null)
+ throw new TableNotFoundException(null, tableName, null);
+
+ if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+ throw new TableOfflineException(instance, tableId);
+
+ return tableId;
+ }
+
+ }
+
private TabletServerBatchWriter bw;
private ConcurrentHashMap<String,BatchWriter> tableWriters;
private final ClientContext context;
+ private final LoadingCache<String,String> nameToIdCache;
public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) {
+ this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT);
+ }
+
+ public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) {
checkArgument(context != null, "context is null");
checkArgument(config != null, "config is null");
+ checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null");
this.context = context;
this.bw = new TabletServerBatchWriter(context, config);
tableWriters = new ConcurrentHashMap<>();
this.closed = new AtomicBoolean(false);
+ this.cacheLastState = new AtomicLong(0);
+
+ // Potentially up to ~500k used to cache names to IDs with "segments" of (maybe) ~1000 entries
+ nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20)
+ .build(new TableNameToIdLoader());
}
@Override
@@ -118,7 +161,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
*/
private String getId(String tableName) throws TableNotFoundException {
try {
- return Tables.getTableId(context.inst, tableName);
+ return nameToIdCache.get(tableName);
} catch (UncheckedExecutionException e) {
Throwable cause = e.getCause();
@@ -133,6 +176,20 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
}
throw e;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+
+ log.error("Unexpected exception when fetching table id for " + tableName);
+
+ if (null == cause) {
+ throw new RuntimeException(e);
+ } else if (cause instanceof TableNotFoundException) {
+ throw (TableNotFoundException) cause;
+ } else if (cause instanceof TableOfflineException) {
+ throw (TableOfflineException) cause;
+ }
+
+ throw new RuntimeException(e);
}
}
@@ -140,6 +197,26 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
+ while (true) {
+ long cacheResetCount = Tables.getCacheResetCount();
+
+ // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes
+ // made before it was called.
+
+ long internalResetCount = cacheLastState.get();
+
+ if (cacheResetCount > internalResetCount) {
+ if (!cacheLastState.compareAndSet(internalResetCount, cacheResetCount)) {
+ continue; // concurrent operation, lets not possibly move cacheLastState backwards in the case where a thread pauses for along time
+ }
+
+ nameToIdCache.invalidateAll();
+ break;
+ }
+
+ break;
+ }
+
String tableId = getId(tableName);
BatchWriter tbw = tableWriters.get(tableId);
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java
deleted file mode 100644
index 3f3d90c..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.impl;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.client.impl.Tables.qualified;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Used for thread safe caching of immutable table ID maps. See ACCUMULO-4778.
- */
-public class TableMap {
- private static final Logger log = LoggerFactory.getLogger(TableMap.class);
-
- private final Map<String,String> tableNameToIdMap;
- private final Map<String,String> tableIdToNameMap;
-
- public TableMap(Instance instance, ZooCache zooCache) {
- List<String> tableIds = zooCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES);
- Map<String,String> namespaceIdToNameMap = new HashMap<>();
- ImmutableMap.Builder<String,String> tableNameToIdBuilder = new ImmutableMap.Builder<>();
- ImmutableMap.Builder<String,String> tableIdToNameBuilder = new ImmutableMap.Builder<>();
- // use StringBuilder to construct zPath string efficiently across many tables
- StringBuilder zPathBuilder = new StringBuilder();
- zPathBuilder.append(ZooUtil.getRoot(instance)).append(Constants.ZTABLES).append("/");
- int prefixLength = zPathBuilder.length();
-
- for (String tableId : tableIds) {
- // reset StringBuilder to prefix length before appending ID and suffix
- zPathBuilder.setLength(prefixLength);
- zPathBuilder.append(tableId).append(Constants.ZTABLE_NAME);
- byte[] tableName = zooCache.get(zPathBuilder.toString());
- zPathBuilder.setLength(prefixLength);
- zPathBuilder.append(tableId).append(Constants.ZTABLE_NAMESPACE);
- byte[] nId = zooCache.get(zPathBuilder.toString());
-
- String namespaceName = Namespaces.DEFAULT_NAMESPACE;
- // create fully qualified table name
- if (nId == null) {
- namespaceName = null;
- } else {
- String namespaceId = new String(nId, UTF_8);
- if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) {
- try {
- namespaceName = namespaceIdToNameMap.get(namespaceId);
- if (namespaceName == null) {
- namespaceName = Namespaces.getNamespaceName(instance, namespaceId);
- namespaceIdToNameMap.put(namespaceId, namespaceName);
- }
- } catch (NamespaceNotFoundException e) {
- log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e);
- continue;
- }
- }
- }
- if (tableName != null && namespaceName != null) {
- String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName);
- tableNameToIdBuilder.put(tableNameStr, tableId);
- tableIdToNameBuilder.put(tableId, tableNameStr);
- }
- }
- tableNameToIdMap = tableNameToIdBuilder.build();
- tableIdToNameMap = tableIdToNameBuilder.build();
- }
-
- public Map<String,String> getNameToIdMap() {
- return tableNameToIdMap;
- }
-
- public Map<String,String> getIdtoNameMap() {
- return tableIdToNameMap;
- }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
index a93347c..fcf838f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
@@ -20,11 +20,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.security.SecurityPermission;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
@@ -36,49 +37,64 @@ import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Tables {
+ private static final Logger log = LoggerFactory.getLogger(Tables.class);
public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$";
private static final SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission");
- // Per instance cache will expire after 10 minutes in case we encounter an instance not used frequently
- private static Cache<String,TableMap> instanceToMapCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
- private static Cache<String,ZooCache> instanceToZooCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+ private static final AtomicLong cacheResetCount = new AtomicLong(0);
- /**
- * Return the cached ZooCache for provided instance. ZooCache is initially created with a watcher that will clear the TableMap cache for that instance when
- * WatchedEvent occurs.
- */
- private static ZooCache getZooCache(final Instance instance) {
+ private static ZooCache getZooCache(Instance instance) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(TABLES_PERMISSION);
}
- final String zks = instance.getZooKeepers();
- final int timeOut = instance.getZooKeepersSessionTimeOut();
- final String uuid = instance.getInstanceID();
+ return new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+ }
- try {
- return instanceToZooCache.get(uuid, new Callable<ZooCache>() {
- @Override
- public ZooCache call() {
- return new ZooCacheFactory().getZooCache(zks, timeOut, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- instanceToMapCache.invalidate(uuid);
+ private static SortedMap<String,String> getMap(Instance instance, boolean nameAsKey) {
+ ZooCache zc = getZooCache(instance);
+
+ List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES);
+ TreeMap<String,String> tableMap = new TreeMap<>();
+ Map<String,String> namespaceIdToNameMap = new HashMap<>();
+
+ for (String tableId : tableIds) {
+ byte[] tableName = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME);
+ byte[] nId = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAMESPACE);
+ String namespaceName = Namespaces.DEFAULT_NAMESPACE;
+ // create fully qualified table name
+ if (nId == null) {
+ namespaceName = null;
+ } else {
+ String namespaceId = new String(nId, UTF_8);
+ if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) {
+ try {
+ namespaceName = namespaceIdToNameMap.get(namespaceId);
+ if (namespaceName == null) {
+ namespaceName = Namespaces.getNamespaceName(instance, namespaceId);
+ namespaceIdToNameMap.put(namespaceId, namespaceName);
}
- });
+ } catch (NamespaceNotFoundException e) {
+ log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e);
+ continue;
+ }
}
- });
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
+ }
+ if (tableName != null && namespaceName != null) {
+ String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName);
+ if (nameAsKey)
+ tableMap.put(tableNameStr, tableId);
+ else
+ tableMap.put(tableId, tableNameStr);
+ }
}
+
+ return tableMap;
}
public static String getTableId(Instance instance, String tableName) throws TableNotFoundException {
@@ -113,31 +129,12 @@ public class Tables {
return tableName;
}
- public static Map<String,String> getNameToIdMap(Instance instance) {
- return getTableMap(instance).getNameToIdMap();
- }
-
- public static Map<String,String> getIdToNameMap(Instance instance) {
- return getTableMap(instance).getIdtoNameMap();
+ public static SortedMap<String,String> getNameToIdMap(Instance instance) {
+ return getMap(instance, true);
}
- /**
- * Get the TableMap from the cache. A new one will be populated when needed. Cache is cleared manually by calling {@link #clearCache(Instance)} or
- * automatically cleared by ZooCache watcher created in {@link #getZooCache(Instance)}. See ACCUMULO-4778.
- */
- private static TableMap getTableMap(final Instance instance) {
- TableMap map;
- try {
- map = instanceToMapCache.get(instance.getInstanceID(), new Callable<TableMap>() {
- @Override
- public TableMap call() {
- return new TableMap(instance, getZooCache(instance));
- }
- });
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- return map;
+ public static SortedMap<String,String> getIdToNameMap(Instance instance) {
+ return getMap(instance, false);
}
public static boolean exists(Instance instance, String tableId) {
@@ -147,9 +144,9 @@ public class Tables {
}
public static void clearCache(Instance instance) {
+ cacheResetCount.incrementAndGet();
getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES);
getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES);
- instanceToMapCache.invalidate(instance.getInstanceID());
}
/**
@@ -161,9 +158,17 @@ public class Tables {
* A zookeeper path
*/
public static void clearCacheByPath(Instance instance, final String zooPath) {
- String thePath = zooPath.startsWith("/") ? zooPath : "/" + zooPath;
+
+ String thePath;
+
+ if (zooPath.startsWith("/")) {
+ thePath = zooPath;
+ } else {
+ thePath = "/" + zooPath;
+ }
+
getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath);
- instanceToMapCache.invalidate(instance.getInstanceID());
+
}
public static String getPrintableTableNameFromId(Map<String,String> tidToNameMap, String tableId) {
@@ -224,6 +229,10 @@ public class Tables {
}
+ public static long getCacheResetCount() {
+ return cacheResetCount.get();
+ }
+
public static String qualified(String tableName) {
return qualified(tableName, Namespaces.DEFAULT_NAMESPACE);
}
diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
index fa5d8bb..f9720f0 100644
--- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -30,6 +31,7 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Credentials;
@@ -59,12 +61,12 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT {
@Before
public void setUpArgs() throws AccumuloException, AccumuloSecurityException {
connector = getConnector();
- mtbw = getMultiTableBatchWriter();
+ mtbw = getMultiTableBatchWriter(60);
}
- public MultiTableBatchWriter getMultiTableBatchWriter() {
+ public MultiTableBatchWriter getMultiTableBatchWriter(long cacheTimeoutInSeconds) {
ClientContext context = new ClientContext(connector.getInstance(), new Credentials(getAdminPrincipal(), getAdminToken()), getCluster().getClientConfig());
- return new MultiTableBatchWriterImpl(context, new BatchWriterConfig());
+ return new MultiTableBatchWriterImpl(context, new BatchWriterConfig(), cacheTimeoutInSeconds, TimeUnit.SECONDS);
}
@Test
@@ -263,7 +265,7 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT {
@Test
public void testTableRenameNewWritersNoCaching() throws Exception {
- mtbw = getMultiTableBatchWriter();
+ mtbw = getMultiTableBatchWriter(0);
try {
final String[] names = getUniqueNames(4);
@@ -404,4 +406,113 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT {
Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
}
+
+ @Test
+ public void testOfflineTableWithCache() throws Exception {
+ boolean mutationsRejected = false;
+
+ try {
+ final String[] names = getUniqueNames(2);
+ final String table1 = names[0], table2 = names[1];
+
+ TableOperations tops = connector.tableOperations();
+ tops.create(table1);
+ tops.create(table2);
+
+ BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+ Mutation m1 = new Mutation("foo");
+ m1.put("col1", "", "val1");
+ m1.put("col2", "", "val2");
+
+ bw1.addMutation(m1);
+ bw2.addMutation(m1);
+
+ tops.offline(table1);
+
+ try {
+ bw1 = mtbw.getBatchWriter(table1);
+ } catch (TableOfflineException e) {
+ // pass
+ mutationsRejected = true;
+ }
+
+ tops.offline(table2);
+
+ try {
+ bw2 = mtbw.getBatchWriter(table2);
+ } catch (TableOfflineException e) {
+ // pass
+ mutationsRejected = true;
+ }
+ } finally {
+ if (null != mtbw) {
+ try {
+ // Mutations might have flushed before the table offline occurred
+ mtbw.close();
+ } catch (MutationsRejectedException e) {
+ // Pass
+ mutationsRejected = true;
+ }
+ }
+ }
+
+ Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
+ }
+
+ @Test
+ public void testOfflineTableWithoutCache() throws Exception {
+ mtbw = getMultiTableBatchWriter(0);
+ boolean mutationsRejected = false;
+
+ try {
+ final String[] names = getUniqueNames(2);
+ final String table1 = names[0], table2 = names[1];
+
+ TableOperations tops = connector.tableOperations();
+ tops.create(table1);
+ tops.create(table2);
+
+ BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+ Mutation m1 = new Mutation("foo");
+ m1.put("col1", "", "val1");
+ m1.put("col2", "", "val2");
+
+ bw1.addMutation(m1);
+ bw2.addMutation(m1);
+
+ // Mutations might or might not flush before tables goes offline
+ tops.offline(table1);
+ tops.offline(table2);
+
+ try {
+ bw1 = mtbw.getBatchWriter(table1);
+ Assert.fail(table1 + " should be offline");
+ } catch (TableOfflineException e) {
+ // pass
+ mutationsRejected = true;
+ }
+
+ try {
+ bw2 = mtbw.getBatchWriter(table2);
+ Assert.fail(table1 + " should be offline");
+ } catch (TableOfflineException e) {
+ // pass
+ mutationsRejected = true;
+ }
+ } finally {
+ if (null != mtbw) {
+ try {
+ // Mutations might have flushed before the table offline occurred
+ mtbw.close();
+ } catch (MutationsRejectedException e) {
+ // Pass
+ mutationsRejected = true;
+ }
+ }
+ }
+
+ Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
+ }
}
--
To stop receiving notification emails like this one, please contact
kturner@apache.org.