You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2013/11/18 04:30:13 UTC

[1/3] git commit: ACCUMULO-1833 Squashed merge of multiple commits that let MTBW work much more efficiently with concurrent access.

Updated Branches:
  refs/heads/master d0b39ca23 -> edf8e28a2


ACCUMULO-1833 Squashed merge of multiple commits that let MTBW work much more efficiently with concurrent access.

Squashed commit of the following:

commit 58d61759cdc673cc5ee86ad1176b7db3b2955679
Author: Josh Elser <el...@apache.org>
Date:   Fri Nov 15 14:26:29 2013 -0800

    ACCUMULO-1833 Recommended changes from Keith regarding previous fixes.

    Guava Cache Exception throwing is covered in tests. Added additional test to exercise table rename. Updated state check
    to be more active and be less susceptible to a paused thread.

commit dd73f52180ca00623469850c4b2d4b03c3768837
Author: Josh Elser <el...@apache.org>
Date:   Tue Nov 12 18:00:07 2013 -0800

    ACCUMULO-1833 Change out the AtomicInteger to AtomicLong to make it slightly more robust.

commit 9f7916db23adfd561254b432e5f5a5c4e9b02e54
Author: Josh Elser <jo...@gmail.com>
Date:   Fri Nov 8 11:36:06 2013 -0500

    ACCUMULO-1833 Simple usage of AtomicInteger to catch table cache
    invalidations and propagate them through MTBW's cache.

commit e8cb6c8ef53afaf41eb9e574607cb03093eec1e8
Author: Josh Elser <jo...@gmail.com>
Date:   Fri Nov 8 10:55:01 2013 -0500

    ACCUMULO-1833 Remove Connector client methods, but leave constructor on
    MTBW in place for testing purposes.

commit b6c6c0270a8bf52d99e0463b2acc98910c4087ca
Author: Josh Elser <jo...@gmail.com>
Date:   Thu Nov 7 22:22:19 2013 -0500

    ACCUMULO-1833 Ensure that we close the MTBW at the end of the test to
    avoid it getting GC'ed later and trying to flush when ZK and the
    instance is already gone.

commit a11883e62de57eaacf0aba6a5019b7abe79563ec
Author: Josh Elser <jo...@gmail.com>
Date:   Thu Nov 7 22:21:29 2013 -0500

    ACCUMULO-1833 Update MTBW close method to match what TSBW is doing
    (update internal boolean then perform the close)

commit e634ca03f326070a42a811d1ed9a181df5214a03
Author: Josh Elser <jo...@gmail.com>
Date:   Thu Nov 7 20:54:20 2013 -0500

    ACCUMULO-1833 Another instance of primitive without synchronization being used instead of
    AtomicBoolean with expected concurrent access.

commit ffe8c243dec4d7c7947cc6512394e9a70a29bc77
Author: Josh Elser <jo...@gmail.com>
Date:   Thu Nov 7 20:37:28 2013 -0500

    ACCUMULO-1833 Tests for expected functionality in the face of table
    operations.

commit 721616e3ff6a4200fd326b7f1ce4be6e1298a7ec
Author: Josh Elser <jo...@gmail.com>
Date:   Thu Nov 7 16:49:41 2013 -0500

    ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove
    zookeeper lock contention and get better concurrent throughput.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6b87c870
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6b87c870
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6b87c870

Branch: refs/heads/master
Commit: 6b87c870d9475f024911649deb6eeb614325d00a
Parents: 1261625
Author: Josh Elser <el...@apache.org>
Authored: Fri Nov 15 14:30:59 2013 -0800
Committer: Josh Elser <el...@apache.org>
Committed: Fri Nov 15 14:30:59 2013 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +
 .../client/impl/MultiTableBatchWriterImpl.java  | 166 ++++--
 .../accumulo/core/client/impl/Tables.java       |   7 +
 .../test/MultiTableBatchWriterTest.java         | 539 +++++++++++++++++++
 4 files changed, 683 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6b87c870/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index f7539f5..d02a3cd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -30,6 +30,10 @@
       <artifactId>jcommander</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6b87c870/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index 4537ae8..49c02d9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@ -16,7 +16,11 @@
  */
 package org.apache.accumulo.core.client.impl;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -33,68 +37,107 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.log4j.Logger;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
 public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
+  public static final long DEFAULT_CACHE_TIME = 200;
+  public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.MILLISECONDS;
+
   static final Logger log = Logger.getLogger(MultiTableBatchWriterImpl.class);
-  private boolean closed;
-  
+  private AtomicBoolean closed;
+  private AtomicLong cacheLastState;
+
   private class TableBatchWriter implements BatchWriter {
-    
+
     private String table;
-    
+
     TableBatchWriter(String table) {
       this.table = table;
     }
-    
+
     @Override
     public void addMutation(Mutation m) throws MutationsRejectedException {
       ArgumentChecker.notNull(m);
       bw.addMutation(table, m);
     }
-    
+
     @Override
     public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
       bw.addMutation(table, iterable.iterator());
     }
-    
+
     @Override
     public void close() {
       throw new UnsupportedOperationException("Must close all tables, can not close an individual table");
     }
-    
+
     @Override
     public void flush() {
       throw new UnsupportedOperationException("Must flush all tables, can not flush an individual table");
     }
-    
+
   }
-  
+
+  /**
+   * CacheLoader which will look up the internal table ID for a given table name.
+   */
+  private class TableNameToIdLoader extends CacheLoader<String,String> {
+
+    @Override
+    public String load(String tableName) throws Exception {
+      String tableId = Tables.getNameToIdMap(instance).get(tableName);
+
+      if (tableId == null)
+        throw new TableNotFoundException(tableId, tableName, null);
+
+      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(instance, tableId);
+
+      return tableId;
+    }
+
+  }
+
   private TabletServerBatchWriter bw;
-  private HashMap<String,BatchWriter> tableWriters;
+  private ConcurrentHashMap<String,BatchWriter> tableWriters;
   private Instance instance;
-  
+  private final LoadingCache<String,String> nameToIdCache;
+
   public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config) {
-    ArgumentChecker.notNull(instance, credentials);
+    this(instance, credentials, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT);
+  }
+
+  public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) {
+    ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit);
     this.instance = instance;
     this.bw = new TabletServerBatchWriter(instance, credentials, config);
-    tableWriters = new HashMap<String,BatchWriter>();
-    this.closed = false;
+    tableWriters = new ConcurrentHashMap<String,BatchWriter>();
+    this.closed = new AtomicBoolean(false);
+    this.cacheLastState = new AtomicLong(0);
+
+    // Potentially up to ~500k used to cache names to IDs with "segments" of (maybe) ~1000 entries
+    nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20)
+        .build(new TableNameToIdLoader());
   }
-  
+
   public boolean isClosed() {
-    return this.closed;
+    return this.closed.get();
   }
-  
+
   public void close() throws MutationsRejectedException {
+    this.closed.set(true);
     bw.close();
-    this.closed = true;
   }
-  
+
   /**
    * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
    */
   @Override
   protected void finalize() {
-    if (!closed) {
+    if (!closed.get()) {
       log.warn(MultiTableBatchWriterImpl.class.getSimpleName() + " not shutdown; did you forget to call close()?");
       try {
         close();
@@ -104,17 +147,74 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
       }
     }
   }
-  
+
+  /**
+   * Returns the table ID for the given table name.
+   * 
+   * @param tableName
+   *          The name of the table which to find the ID for
+   * @return The table ID, or null if the table name doesn't exist
+   */
+  private String getId(String tableName) throws TableNotFoundException {
+    try {
+      return nameToIdCache.get(tableName);
+    } catch (UncheckedExecutionException e) {
+      Throwable cause = e.getCause();
+
+      log.error("Unexpected exception when fetching table id for " + tableName);
+
+      if (null == cause) {
+        throw new RuntimeException(e);
+      } else if (cause instanceof TableNotFoundException) {
+        throw (TableNotFoundException) cause;
+      } else if (cause instanceof TableOfflineException) {
+        throw (TableOfflineException) cause;
+      }
+
+      throw e;
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+
+      log.error("Unexpected exception when fetching table id for " + tableName);
+
+      if (null == cause) {
+        throw new RuntimeException(e);
+      } else if (cause instanceof TableNotFoundException) {
+        throw (TableNotFoundException) cause;
+      } else if (cause instanceof TableOfflineException) {
+        throw (TableOfflineException) cause;
+      }
+
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
-  public synchronized BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     ArgumentChecker.notNull(tableName);
-    String tableId = Tables.getNameToIdMap(instance).get(tableName);
-    if (tableId == null)
-      throw new TableNotFoundException(tableId, tableName, null);
-    
-    if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
-      throw new TableOfflineException(instance, tableId);
-    
+
+    while (true) {
+      long cacheResetCount = Tables.getCacheResetCount();
+
+      // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes
+      // made before it was called.
+      
+      long internalResetCount = cacheLastState.get();
+
+      if (cacheResetCount > internalResetCount) {
+        if (!cacheLastState.compareAndSet(internalResetCount, cacheResetCount)) {
+          continue; // concurrent operation, lets not possibly move cacheLastState backwards in the case where a thread pauses for along time
+        }
+
+        nameToIdCache.invalidateAll();
+        break;
+      }
+
+      break;
+    }
+
+    String tableId = getId(tableName);
+
     BatchWriter tbw = tableWriters.get(tableId);
     if (tbw == null) {
       tbw = new TableBatchWriter(tableId);
@@ -122,10 +222,10 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
     }
     return tbw;
   }
-  
+
   @Override
   public void flush() throws MutationsRejectedException {
     bw.flush();
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6b87c870/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
index 71518c5..d1b10d4 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -31,6 +32,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCache;
 
 public class Tables {
   private static SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission");
+  private static AtomicLong cacheResetCount = new AtomicLong(0);
   
   private static ZooCache getZooCache(Instance instance) {
     SecurityManager sm = System.getSecurityManager();
@@ -89,6 +91,7 @@ public class Tables {
   }
   
   public static void clearCache(Instance instance) {
+    cacheResetCount.incrementAndGet();
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES);
   }
   
@@ -111,4 +114,8 @@ public class Tables {
     
     return TableState.valueOf(new String(state));
   }
+  
+  public static long getCacheResetCount() {
+    return cacheResetCount.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6b87c870/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
new file mode 100644
index 0000000..c5290e4
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterTest.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.impl.MultiTableBatchWriterImpl;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.google.common.collect.Maps;
+
+public class MultiTableBatchWriterTest {
+  public static TemporaryFolder folder = new TemporaryFolder();
+  public static MiniAccumuloCluster cluster;
+  private static final PasswordToken password = new PasswordToken("secret");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    folder.create();
+    MiniAccumuloConfig cfg = new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), new String(password.getPassword()));
+    cluster = new MiniAccumuloCluster(cfg);
+    cluster.start();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    cluster.stop();
+    folder.delete();
+  }
+  
+  @Test
+  public void testTableRenameDataValidation() throws Exception {
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testTableRenameDataValidation_table1", table2 = "testTableRenameDataValidation_table2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+
+      bw1.addMutation(m1);
+
+      tops.rename(table1, table2);
+      tops.create(table1);
+
+      BatchWriter bw2 = mtbw.getBatchWriter(table1);
+      
+      Mutation m2 = new Mutation("bar");
+      m2.put("col1", "", "val1");
+
+      bw1.addMutation(m2);
+      bw2.addMutation(m2);
+
+      mtbw.close();
+
+      Map<Entry<String,String>,String> table1Expectations = new HashMap<Entry<String,String>,String>();
+      table1Expectations.put(Maps.immutableEntry("bar", "col1"), "val1");
+
+      Map<Entry<String,String>,String> table2Expectations = new HashMap<Entry<String,String>,String>();
+      table2Expectations.put(Maps.immutableEntry("foo", "col1"), "val1");
+      table2Expectations.put(Maps.immutableEntry("bar", "col1"), "val1");
+
+      Scanner s = connector.createScanner(table1, new Authorizations());
+      s.setRange(new Range());
+      Map<Entry<String,String>,String> actual = new HashMap<Entry<String,String>,String>();
+      for (Entry<Key,Value> entry : s) {
+        actual.put(Maps.immutableEntry(entry.getKey().getRow().toString(), entry.getKey().getColumnFamily().toString()), entry.getValue().toString());
+      }
+
+      Assert.assertEquals("Differing results for " + table1, table1Expectations, actual);
+
+      s = connector.createScanner(table2, new Authorizations());
+      s.setRange(new Range());
+      actual = new HashMap<Entry<String,String>,String>();
+      for (Entry<Key,Value> entry : s) {
+        actual.put(Maps.immutableEntry(entry.getKey().getRow().toString(), entry.getKey().getColumnFamily().toString()), entry.getValue().toString());
+      }
+
+      Assert.assertEquals("Differing results for " + table2, table2Expectations, actual);
+      
+    } finally {
+      if (null != mtbw) {
+        mtbw.close();
+      }
+    }
+  }
+
+  @Test
+  public void testTableRenameSameWriters() throws Exception {
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testTableRenameSameWriters_table1", table2 = "testTableRenameSameWriters_table2";
+      final String newTable1 = "testTableRenameSameWriters_newTable1", newTable2 = "testTableRenameSameWriters_newTable2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.rename(table1, newTable1);
+      tops.rename(table2, newTable2);
+
+      Mutation m2 = new Mutation("bar");
+      m2.put("col1", "", "val1");
+      m2.put("col2", "", "val2");
+
+      bw1.addMutation(m2);
+      bw2.addMutation(m2);
+
+      mtbw.close();
+
+      Map<Entry<String,String>,String> expectations = new HashMap<Entry<String,String>,String>();
+      expectations.put(Maps.immutableEntry("foo", "col1"), "val1");
+      expectations.put(Maps.immutableEntry("foo", "col2"), "val2");
+      expectations.put(Maps.immutableEntry("bar", "col1"), "val1");
+      expectations.put(Maps.immutableEntry("bar", "col2"), "val2");
+
+      for (String table : Arrays.asList(newTable1, newTable2)) {
+        Scanner s = connector.createScanner(table, new Authorizations());
+        s.setRange(new Range());
+        Map<Entry<String,String>,String> actual = new HashMap<Entry<String,String>,String>();
+        for (Entry<Key,Value> entry : s) {
+          actual.put(Maps.immutableEntry(entry.getKey().getRow().toString(), entry.getKey().getColumnFamily().toString()), entry.getValue().toString());
+        }
+
+        Assert.assertEquals("Differing results for " + table, expectations, actual);
+      }
+    } finally {
+      if (null != mtbw) {
+        mtbw.close();
+      }
+    }
+  }
+
+  @Test
+  public void testTableRenameNewWriters() throws Exception {
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testTableRenameNewWriters_table1", table2 = "testTableRenameNewWriters_table2";
+      final String newTable1 = "testTableRenameNewWriters_newTable1", newTable2 = "testTableRenameNewWriters_newTable2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.rename(table1, newTable1);
+
+      // MTBW is still caching this name to the correct table, but we should invalidate its cache
+      // after seeing the rename
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+        Assert.fail("Should not be able to find this table");
+      } catch (TableNotFoundException e) {
+        // pass
+      }
+
+      tops.rename(table2, newTable2);
+      
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+        Assert.fail("Should not be able to find this table");
+      } catch (TableNotFoundException e) {
+        //pass
+      }
+      
+      bw1 = mtbw.getBatchWriter(newTable1);
+      bw2 = mtbw.getBatchWriter(newTable2);
+
+      Mutation m2 = new Mutation("bar");
+      m2.put("col1", "", "val1");
+      m2.put("col2", "", "val2");
+
+      bw1.addMutation(m2);
+      bw2.addMutation(m2);
+
+      mtbw.close();
+
+      Map<Entry<String,String>,String> expectations = new HashMap<Entry<String,String>,String>();
+      expectations.put(Maps.immutableEntry("foo", "col1"), "val1");
+      expectations.put(Maps.immutableEntry("foo", "col2"), "val2");
+      expectations.put(Maps.immutableEntry("bar", "col1"), "val1");
+      expectations.put(Maps.immutableEntry("bar", "col2"), "val2");
+
+      for (String table : Arrays.asList(newTable1, newTable2)) {
+        Scanner s = connector.createScanner(table, new Authorizations());
+        s.setRange(new Range());
+        Map<Entry<String,String>,String> actual = new HashMap<Entry<String,String>,String>();
+        for (Entry<Key,Value> entry : s) {
+          actual.put(Maps.immutableEntry(entry.getKey().getRow().toString(), entry.getKey().getColumnFamily().toString()), entry.getValue().toString());
+        }
+
+        Assert.assertEquals("Differing results for " + table, expectations, actual);
+      }
+    } finally {
+      if (null != mtbw) {
+        mtbw.close();
+      }
+    }
+  }
+
+  @Test
+  public void testTableRenameNewWritersNoCaching() throws Exception {
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+    
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 0, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testTableRenameNewWritersNoCaching_table1", table2 = "testTableRenameNewWritersNoCaching_table2";
+      final String newTable1 = "testTableRenameNewWritersNoCaching_newTable1", newTable2 = "testTableRenameNewWritersNoCaching_newTable2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.rename(table1, newTable1);
+      tops.rename(table2, newTable2);
+
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+        Assert.fail("Should not have gotten batchwriter for " + table1);
+      } catch (TableNotFoundException e) {
+        // Pass
+      }
+
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+      } catch (TableNotFoundException e) {
+        // Pass
+      }
+    } finally {
+      if (null != mtbw) {
+        mtbw.close();
+      }
+    }
+  }
+
+  @Test
+  public void testTableDelete() throws Exception {
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testTableDelete_table1", table2 = "testTableDelete_table2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.delete(table1);
+      tops.delete(table2);
+
+      Mutation m2 = new Mutation("bar");
+      m2.put("col1", "", "val1");
+      m2.put("col2", "", "val2");
+
+      bw1.addMutation(m2);
+      bw2.addMutation(m2);
+
+    } finally {
+      if (null != mtbw) {
+        try {
+          mtbw.close();
+          Assert.fail("Should not be able to close batch writers");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testOfflineTable() throws Exception {
+
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testOfflineTable_table1", table2 = "testOfflineTable_table2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.offline(table1);
+      tops.offline(table2);
+
+      Mutation m2 = new Mutation("bar");
+      m2.put("col1", "", "val1");
+      m2.put("col2", "", "val2");
+
+      bw1.addMutation(m2);
+      bw2.addMutation(m2);
+    } finally {
+      if (null != mtbw) {
+        try {
+          mtbw.close();
+          Assert.fail("Should not be able to close batch writers");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
+      }
+
+    }
+  }
+
+  @Test
+  public void testOfflineTableWithCache() throws Exception {
+
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+    
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 60, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testOfflineTableWithCache_table1", table2 = "testOfflineTableWithCache_table2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.offline(table1);
+
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+      } catch (TableOfflineException e) {
+        // pass
+      }
+
+      tops.offline(table2);
+
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+      } catch (TableOfflineException e) {
+        // pass
+      }
+    } finally {
+      if (null != mtbw) {
+        try {
+          mtbw.close();
+          Assert.fail("Expecting close on MTBW to fail due to offline tables");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testOfflineTableWithoutCache() throws Exception {
+
+    ZooKeeperInstance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector connector = instance.getConnector("root", password);
+
+    BatchWriterConfig config = new BatchWriterConfig();
+
+    TCredentials creds = CredentialHelper.create("root", password, instance.getInstanceID());
+    MultiTableBatchWriter mtbw = new MultiTableBatchWriterImpl(instance, creds, config, 0, TimeUnit.SECONDS);
+
+    try {
+      final String table1 = "testOfflineTableWithoutCache_table1", table2 = "testOfflineTableWithoutCache_table2";
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.offline(table1);
+      tops.offline(table2);
+
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+        Assert.fail(table1 + " should be offline");
+      } catch (TableOfflineException e) {
+        // pass
+      }
+
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+        Assert.fail(table1 + " should be offline");
+      } catch (TableOfflineException e) {
+        // pass
+      }
+    } finally {
+      if (null != mtbw) {
+        try {
+          mtbw.close();
+          Assert.fail("Expecting close on MTBW to fail due to offline tables");
+        } catch (MutationsRejectedException e) {
+          // Pass
+        }
+      }
+    }
+  }
+}


[2/3] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by el...@apache.org.
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/45fbee69
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/45fbee69
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/45fbee69

Branch: refs/heads/master
Commit: 45fbee6937549048c74fe176f201c246de0f5e0a
Parents: 0b8cca0 6b87c87
Author: Josh Elser <el...@apache.org>
Authored: Sun Nov 17 22:08:31 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Sun Nov 17 22:26:13 2013 -0500

----------------------------------------------------------------------
 .../client/impl/MultiTableBatchWriterImpl.java  | 168 ++++--
 .../accumulo/core/client/impl/Tables.java       |   7 +
 .../test/MultiTableBatchWriterTest.java         | 539 +++++++++++++++++++
 3 files changed, 679 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/45fbee69/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index 0d2d44d,49c02d9..7eea455
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@@ -65,32 -78,60 +78,60 @@@ public class MultiTableBatchWriterImpl 
      public void flush() {
        throw new UnsupportedOperationException("Must flush all tables, can not flush an individual table");
      }
-     
+ 
+   }
+ 
+   /**
+    * CacheLoader which will look up the internal table ID for a given table name.
+    */
+   private class TableNameToIdLoader extends CacheLoader<String,String> {
+ 
+     @Override
+     public String load(String tableName) throws Exception {
+       String tableId = Tables.getNameToIdMap(instance).get(tableName);
+ 
+       if (tableId == null)
+         throw new TableNotFoundException(tableId, tableName, null);
+ 
+       if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+         throw new TableOfflineException(instance, tableId);
+ 
+       return tableId;
+     }
+ 
    }
-   
+ 
    private TabletServerBatchWriter bw;
-   private HashMap<String,BatchWriter> tableWriters;
+   private ConcurrentHashMap<String,BatchWriter> tableWriters;
    private Instance instance;
-   
+   private final LoadingCache<String,String> nameToIdCache;
+ 
 -  public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config) {
 +  public MultiTableBatchWriterImpl(Instance instance, Credentials credentials, BatchWriterConfig config) {
-     ArgumentChecker.notNull(instance, credentials);
+     this(instance, credentials, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT);
+   }
+ 
 -  public MultiTableBatchWriterImpl(Instance instance, TCredentials credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) {
++  public MultiTableBatchWriterImpl(Instance instance, Credentials credentials, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) {
+     ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit);
      this.instance = instance;
      this.bw = new TabletServerBatchWriter(instance, credentials, config);
-     tableWriters = new HashMap<String,BatchWriter>();
-     this.closed = false;
+     tableWriters = new ConcurrentHashMap<String,BatchWriter>();
+     this.closed = new AtomicBoolean(false);
+     this.cacheLastState = new AtomicLong(0);
+ 
+     // Potentially up to ~500k used to cache names to IDs with "segments" of (maybe) ~1000 entries
+     nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20)
+         .build(new TableNameToIdLoader());
    }
-   
-   @Override
+ 
    public boolean isClosed() {
-     return this.closed;
+     return this.closed.get();
    }
-   
-   @Override
+ 
    public void close() throws MutationsRejectedException {
+     this.closed.set(true);
      bw.close();
-     this.closed = true;
    }
-   
+ 
    /**
     * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
     */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45fbee69/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
----------------------------------------------------------------------


[3/3] git commit: Merge branch '1.6.0-SNAPSHOT'

Posted by el...@apache.org.
Merge branch '1.6.0-SNAPSHOT'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/edf8e28a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/edf8e28a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/edf8e28a

Branch: refs/heads/master
Commit: edf8e28a2ddc34c385d904bb274ded0214486042
Parents: d0b39ca 45fbee6
Author: Josh Elser <el...@apache.org>
Authored: Sun Nov 17 22:27:35 2013 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Sun Nov 17 22:27:35 2013 -0500

----------------------------------------------------------------------
 .../client/impl/MultiTableBatchWriterImpl.java  | 168 ++++--
 .../accumulo/core/client/impl/Tables.java       |   7 +
 .../test/MultiTableBatchWriterTest.java         | 539 +++++++++++++++++++
 3 files changed, 679 insertions(+), 35 deletions(-)
----------------------------------------------------------------------