You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/12 20:51:40 UTC

[07/13] accumulo git commit: Merge branch '1.6' into 1.7

Merge branch '1.6' into 1.7

Conflicts:
	test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
	test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94f4a19c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94f4a19c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94f4a19c

Branch: refs/heads/1.7
Commit: 94f4a19c0d776f3a57e855863c34e9e3164b615b
Parents: 6562828 29c6052
Author: Josh Elser <el...@apache.org>
Authored: Tue Jan 12 12:25:34 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Jan 12 12:25:34 2016 -0500

----------------------------------------------------------------------
 test/src/test/java/org/apache/accumulo/test/CleanWalIT.java        | 2 +-
 .../java/org/apache/accumulo/test/DetectDeadTabletServersIT.java   | 2 +-
 test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java     | 2 +-
 .../org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java    | 2 +-
 .../test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java   | 2 +-
 .../org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java  | 2 +-
 .../test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java  | 2 +-
 .../test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java | 2 +-
 .../java/org/apache/accumulo/test/functional/BinaryStressIT.java   | 2 +-
 .../test/java/org/apache/accumulo/test/functional/CleanTmpIT.java  | 2 +-
 .../java/org/apache/accumulo/test/functional/CompactionIT.java     | 2 +-
 .../java/org/apache/accumulo/test/functional/DurabilityIT.java     | 2 +-
 .../org/apache/accumulo/test/functional/GarbageCollectorIT.java    | 2 +-
 .../org/apache/accumulo/test/functional/KerberosRenewalIT.java     | 2 +-
 .../java/org/apache/accumulo/test/functional/MasterFailoverIT.java | 2 +-
 .../test/java/org/apache/accumulo/test/functional/RestartIT.java   | 2 +-
 .../java/org/apache/accumulo/test/functional/RestartStressIT.java  | 2 +-
 .../org/apache/accumulo/test/functional/SessionDurabilityIT.java   | 2 +-
 .../java/org/apache/accumulo/test/functional/WriteAheadLogIT.java  | 2 +-
 .../org/apache/accumulo/test/functional/ZookeeperRestartIT.java    | 2 +-
 .../accumulo/test/replication/MultiInstanceReplicationIT.java      | 2 +-
 .../test/replication/UnorderedWorkAssignerReplicationIT.java       | 2 +-
 22 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index d754a14,b2298f7..08e3c09
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@@ -57,10 -54,9 +57,10 @@@ public class CleanWalIT extends Accumul
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
--    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
      cfg.setNumTservers(1);
 -    cfg.useMiniDFS(true);
 +    // use raw local file system so walogs sync and flush will work
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    @Before

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index 04c781b,1e65601..f7ee089
--- a/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@@ -42,7 -41,7 +42,7 @@@ public class DetectDeadTabletServersIT 
  
    @Override
    protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
--    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
    }
  
    @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
index 1a3c92f,0000000..57c2c34
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
@@@ -1,101 -1,0 +1,101 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.cluster.ClusterControl;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterIT;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +// Accumulo3010
 +public class RecoveryCompactionsAreFlushesIT extends AccumuloClusterIT {
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    // file system supports recovery
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    // create a table
 +    String tableName = getUniqueNames(1)[0];
 +    Connector c = getConnector();
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
 +    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "3");
 +    // create 3 flush files
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    Mutation m = new Mutation("a");
 +    m.put("b", "c", new Value("v".getBytes()));
 +    for (int i = 0; i < 3; i++) {
 +      bw.addMutation(m);
 +      bw.flush();
 +      c.tableOperations().flush(tableName, null, null, true);
 +    }
 +    // create an unsaved mutation
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    ClusterControl control = cluster.getClusterControl();
 +
 +    // kill the tablet servers
 +    control.stopAllServers(ServerType.TABLET_SERVER);
 +
 +    // recover
 +    control.startAllServers(ServerType.TABLET_SERVER);
 +
 +    // ensure the table is readable
 +    Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
 +
 +    // ensure that the recovery was not a merging minor compaction
 +    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +    for (Entry<Key,Value> entry : s) {
 +      String filename = entry.getKey().getColumnQualifier().toString();
 +      String parts[] = filename.split("/");
 +      Assert.assertFalse(parts[parts.length - 1].startsWith("M"));
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index 081ee85,0000000..68bd07b
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@@ -1,75 -1,0 +1,75 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.TreeSet;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.ConfigurableMacIT;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +// ACCUMULO-2480
 +public class TabletServerGivesUpIT extends ConfigurableMacIT {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.useMiniDFS(true);
 +    cfg.setNumTservers(1);
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15");
 +    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s");
 +  }
 +
 +  @Test(timeout = 30 * 1000)
 +  public void test() throws Exception {
 +    final Connector conn = this.getConnector();
 +    // Yes, there's a tabletserver
 +    assertEquals(1, conn.instanceOperations().getTabletServers().size());
 +    final String tableName = getUniqueNames(1)[0];
 +    conn.tableOperations().create(tableName);
 +    // Kill dfs
 +    cluster.getMiniDfs().shutdown();
 +    // ask the tserver to do something
 +    final AtomicReference<Exception> ex = new AtomicReference<>();
 +    Thread splitter = new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          TreeSet<Text> splits = new TreeSet<>();
 +          splits.add(new Text("X"));
 +          conn.tableOperations().addSplits(tableName, splits);
 +        } catch (Exception e) {
 +          ex.set(e);
 +        }
 +      }
 +    };
 +    splitter.start();
 +    // wait for the tserver to give up on writing to the WAL
 +    while (conn.instanceOperations().getTabletServers().size() == 1) {
 +      UtilWaitThread.sleep(1000);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
index 8338021,0000000..c318075
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
@@@ -1,107 -1,0 +1,107 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.accumulo.server.util.Admin;
 +import org.apache.accumulo.test.functional.ConfigurableMacIT;
 +import org.apache.accumulo.test.functional.FunctionalTestUtils;
 +import org.apache.accumulo.tserver.TabletServer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class VerifySerialRecoveryIT extends ConfigurableMacIT {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "20");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testSerializedRecovery() throws Exception {
 +    // make a table with many splits
 +    String tableName = getUniqueNames(1)[0];
 +    Connector c = getConnector();
 +    c.tableOperations().create(tableName);
 +    SortedSet<Text> splits = new TreeSet<Text>();
 +    for (int i = 0; i < 200; i++) {
 +      splits.add(new Text(AssignmentThreadsIT.randomHex(8)));
 +    }
 +    c.tableOperations().addSplits(tableName, splits);
 +    // load data to give the recovery something to do
 +    BatchWriter bw = c.createBatchWriter(tableName, null);
 +    for (int i = 0; i < 50000; i++) {
 +      Mutation m = new Mutation(AssignmentThreadsIT.randomHex(8));
 +      m.put("", "", "");
 +      bw.addMutation(m);
 +    }
 +    bw.close();
 +    // kill the tserver
 +    for (ProcessReference ref : getCluster().getProcesses().get(ServerType.TABLET_SERVER))
 +      getCluster().killProcess(ServerType.TABLET_SERVER, ref);
 +    final Process ts = cluster.exec(TabletServer.class);
 +
 +    // wait for recovery
 +    Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +    ts.waitFor();
 +    String result = FunctionalTestUtils.readAll(cluster, TabletServer.class, ts);
 +    for (String line : result.split("\n")) {
 +      System.out.println(line);
 +    }
 +    // walk through the output, verifying that only a single normal recovery was running at one time
 +    boolean started = false;
 +    int recoveries = 0;
 +    for (String line : result.split("\n")) {
 +      // ignore metadata tables
 +      if (line.contains("!0") || line.contains("+r"))
 +        continue;
 +      if (line.contains("Starting Write-Ahead Log")) {
 +        assertFalse(started);
 +        started = true;
 +        recoveries++;
 +      }
 +      if (line.contains("Write-Ahead Log recovery complete")) {
 +        assertTrue(started);
 +        started = false;
 +      }
 +    }
 +    assertFalse(started);
 +    assertTrue(recoveries > 0);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
index 62d8738,fbe504e..f1bca1b
--- a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@@ -50,9 -51,10 +50,9 @@@ public class BinaryStressIT extends Acc
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
 -    Map<String,String> siteConfig = new HashMap<String,String>();
 -    siteConfig.put(Property.TSERV_MAXMEM.getKey(), "50K");
 -    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
 -    cfg.setSiteConfig(siteConfig);
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_MAXMEM, "50K");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
    }
  
    private String majcDelay, maxMem;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
index 921d661,65422c9..d03007e
--- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@@ -51,11 -53,12 +51,11 @@@ public class CleanTmpIT extends Configu
    private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
  
    @Override
 -  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 -    Map<String,String> props = new HashMap<String,String>();
 -    props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
 -    cfg.setSiteConfig(props);
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
      cfg.setNumTservers(1);
 -    cfg.useMiniDFS(true);
 +    // use raw local file system so walogs sync and flush will work
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
index 2fe5470,907d17d..818dbc4
--- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -58,12 -51,11 +58,12 @@@ public class CompactionIT extends Accum
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 -    Map<String,String> map = new HashMap<String,String>();
 -    map.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
 -    map.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
 -    map.put(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
 -    cfg.setSiteConfig(map);
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 +    cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1");
 +    // use raw local file system so walogs sync and flush will work
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
index 9a262a1,0000000..819347e
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
@@@ -1,222 -1,0 +1,222 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class DurabilityIT extends ConfigurableMacIT {
 +  private static final Logger log = LoggerFactory.getLogger(DurabilityIT.class);
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setNumTservers(1);
 +  }
 +
 +  static final long N = 100000;
 +
 +  private String[] init() throws Exception {
 +    String[] tableNames = getUniqueNames(4);
 +    Connector c = getConnector();
 +    TableOperations tableOps = c.tableOperations();
 +    createTable(tableNames[0]);
 +    createTable(tableNames[1]);
 +    createTable(tableNames[2]);
 +    createTable(tableNames[3]);
 +    // default is sync
 +    tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
 +    tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
 +    tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
 +    return tableNames;
 +  }
 +
 +  private void cleanup(String[] tableNames) throws Exception {
 +    Connector c = getConnector();
 +    for (String tableName : tableNames) {
 +      c.tableOperations().delete(tableName);
 +    }
 +  }
 +
 +  private void createTable(String tableName) throws Exception {
 +    TableOperations tableOps = getConnector().tableOperations();
 +    tableOps.create(tableName);
 +  }
 +
 +  @Test(timeout = 2 * 60 * 1000)
 +  public void testWriteSpeed() throws Exception {
 +    TableOperations tableOps = getConnector().tableOperations();
 +    String tableNames[] = init();
 +    // write some gunk, delete the table to keep that table from messing with the performance numbers of successive calls
 +    // sync
 +    long t0 = writeSome(tableNames[0], N);
 +    tableOps.delete(tableNames[0]);
 +    // flush
 +    long t1 = writeSome(tableNames[1], N);
 +    tableOps.delete(tableNames[1]);
 +    // log
 +    long t2 = writeSome(tableNames[2], N);
 +    tableOps.delete(tableNames[2]);
 +    // none
 +    long t3 = writeSome(tableNames[3], N);
 +    tableOps.delete(tableNames[3]);
 +    System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3));
 +    assertTrue("flush should be faster than sync", t0 > t1);
 +    assertTrue("log should be faster than flush", t1 > t2);
 +    assertTrue("no durability should be faster than log", t2 > t3);
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testSync() throws Exception {
 +    String tableNames[] = init();
 +    // sync table should lose nothing
 +    writeSome(tableNames[0], N);
 +    restartTServer();
 +    assertEquals(N, readSome(tableNames[0]));
 +    cleanup(tableNames);
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testFlush() throws Exception {
 +    String tableNames[] = init();
 +    // flush table won't lose anything since we're not losing power/dfs
 +    writeSome(tableNames[1], N);
 +    restartTServer();
 +    assertEquals(N, readSome(tableNames[1]));
 +    cleanup(tableNames);
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testLog() throws Exception {
 +    String tableNames[] = init();
 +    // we're probably going to lose something the the log setting
 +    writeSome(tableNames[2], N);
 +    restartTServer();
 +    long numResults = readSome(tableNames[2]);
 +    assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
 +    cleanup(tableNames);
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testNone() throws Exception {
 +    String tableNames[] = init();
 +    // probably won't get any data back without logging
 +    writeSome(tableNames[3], N);
 +    restartTServer();
 +    long numResults = readSome(tableNames[3]);
 +    assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
 +    cleanup(tableNames);
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testIncreaseDurability() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
 +    writeSome(tableName, N);
 +    restartTServer();
 +    long numResults = readSome(tableName);
 +    assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
 +    writeSome(tableName, N);
 +    restartTServer();
 +    assertTrue(N == readSome(tableName));
 +  }
 +
 +  private static Map<String,String> map(Iterable<Entry<String,String>> entries) {
 +    Map<String,String> result = new HashMap<String,String>();
 +    for (Entry<String,String> entry : entries) {
 +      result.put(entry.getKey(), entry.getValue());
 +    }
 +    return result;
 +  }
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void testMetaDurability() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none");
 +    Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME));
 +    assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey()));
 +    c.tableOperations().create(tableName);
 +    props = map(c.tableOperations().getProperties(tableName));
 +    assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey()));
 +    restartTServer();
 +    assertTrue(c.tableOperations().exists(tableName));
 +  }
 +
 +  private long readSome(String table) throws Exception {
 +    return Iterators.size(getConnector().createScanner(table, Authorizations.EMPTY).iterator());
 +  }
 +
 +  private void restartTServer() throws Exception {
 +    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +      cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +    }
 +    cluster.start();
 +  }
 +
 +  private long writeSome(String table, long count) throws Exception {
 +    int iterations = 5;
 +    long[] attempts = new long[iterations];
 +    for (int attempt = 0; attempt < iterations; attempt++) {
 +      long now = System.currentTimeMillis();
 +      Connector c = getConnector();
 +      BatchWriter bw = c.createBatchWriter(table, null);
 +      for (int i = 1; i < count + 1; i++) {
 +        Mutation m = new Mutation("" + i);
 +        m.put("", "", "");
 +        bw.addMutation(m);
 +        if (i % (Math.max(1, count / 100)) == 0) {
 +          bw.flush();
 +        }
 +      }
 +      bw.close();
 +      attempts[attempt] = System.currentTimeMillis() - now;
 +    }
 +    Arrays.sort(attempts);
 +    log.info("Attempt durations: {}", Arrays.toString(attempts));
 +    // Return the median duration
 +    return attempts[2];
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 132cbcc,d5b92cf..6ab0541
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@@ -80,13 -80,14 +80,13 @@@ public class GarbageCollectorIT extend
  
    @Override
    public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 -    Map<String,String> settings = new HashMap<String,String>();
 -    settings.put(Property.INSTANCE_SECRET.getKey(), OUR_SECRET);
 -    settings.put(Property.GC_CYCLE_START.getKey(), "1");
 -    settings.put(Property.GC_CYCLE_DELAY.getKey(), "1");
 -    settings.put(Property.GC_PORT.getKey(), "0");
 -    settings.put(Property.TSERV_MAXMEM.getKey(), "5K");
 -    settings.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
 -    cfg.setSiteConfig(settings);
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
 +    cfg.setProperty(Property.GC_CYCLE_START, "1");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
 +    cfg.setProperty(Property.GC_PORT, "0");
 +    cfg.setProperty(Property.TSERV_MAXMEM, "5K");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
  
      // use raw local file system so walogs sync and flush will work
      hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
index 19908f6,0000000..28c1dfc
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
@@@ -1,188 -1,0 +1,188 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloIT;
 +import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 +import org.apache.accumulo.harness.MiniClusterHarness;
 +import org.apache.accumulo.harness.TestingKdc;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 +import org.apache.hadoop.minikdc.MiniKdc;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.junit.After;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +
 +/**
 + * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing.
 + */
 +public class KerberosRenewalIT extends AccumuloIT {
 +  private static final Logger log = LoggerFactory.getLogger(KerberosRenewalIT.class);
 +
 +  private static TestingKdc kdc;
 +  private static String krbEnabledForITs = null;
 +  private static ClusterUser rootUser;
 +
 +  private static final long TICKET_LIFETIME = 6 * 60 * 1000; // Anything less seems to fail when generating the ticket
 +  private static final long TICKET_TEST_LIFETIME = 8 * 60 * 1000; // Run a test for 8 mins
 +  private static final long TEST_DURATION = 9 * 60 * 1000; // The test should finish within 9 mins
 +
 +  @BeforeClass
 +  public static void startKdc() throws Exception {
 +    // 30s renewal time window
 +    kdc = new TestingKdc(TestingKdc.computeKdcDir(), TestingKdc.computeKeytabDir(), TICKET_LIFETIME);
 +    kdc.start();
 +    krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION);
 +    if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) {
 +      System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true");
 +    }
 +    rootUser = kdc.getRootUser();
 +  }
 +
 +  @AfterClass
 +  public static void stopKdc() throws Exception {
 +    if (null != kdc) {
 +      kdc.stop();
 +    }
 +    if (null != krbEnabledForITs) {
 +      System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs);
 +    }
 +  }
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return (int) TEST_DURATION / 1000;
 +  }
 +
 +  private MiniAccumuloClusterImpl mac;
 +
 +  @Before
 +  public void startMac() throws Exception {
 +    MiniClusterHarness harness = new MiniClusterHarness();
 +    mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() {
 +
 +      @Override
 +      public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
 +        Map<String,String> site = cfg.getSiteConfig();
-         site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s");
++        site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 +        // Reduce the period just to make sure we trigger renewal fast
 +        site.put(Property.GENERAL_KERBEROS_RENEWAL_PERIOD.getKey(), "5s");
 +        cfg.setSiteConfig(site);
 +      }
 +
 +    });
 +
 +    mac.getConfig().setNumTservers(1);
 +    mac.start();
 +    // Enabled kerberos auth
 +    Configuration conf = new Configuration(false);
 +    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
 +    UserGroupInformation.setConfiguration(conf);
 +  }
 +
 +  @After
 +  public void stopMac() throws Exception {
 +    if (null != mac) {
 +      mac.stop();
 +    }
 +  }
 +
 +  // Intentially setting the Test annotation timeout. We do not want to scale the timeout.
 +  @Test(timeout = TEST_DURATION)
 +  public void testReadAndWriteThroughTicketLifetime() throws Exception {
 +    // Attempt to use Accumulo for a duration of time that exceeds the Kerberos ticket lifetime.
 +    // This is a functional test to verify that Accumulo services renew their ticket.
 +    // If the test doesn't finish on its own, this signifies that Accumulo services failed
 +    // and the test should fail. If Accumulo services renew their ticket, the test case
 +    // should exit gracefully on its own.
 +
 +    // Login as the "root" user
 +    UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());
 +    log.info("Logged in as {}", rootUser.getPrincipal());
 +
 +    Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken());
 +    log.info("Created connector as {}", rootUser.getPrincipal());
 +    assertEquals(rootUser.getPrincipal(), conn.whoami());
 +
 +    long duration = 0;
 +    long last = System.currentTimeMillis();
 +    // Make sure we have a couple renewals happen
 +    while (duration < TICKET_TEST_LIFETIME) {
 +      // Create a table, write a record, compact, read the record, drop the table.
 +      createReadWriteDrop(conn);
 +      // Wait a bit after
 +      Thread.sleep(5000);
 +
 +      // Update the duration
 +      long now = System.currentTimeMillis();
 +      duration += now - last;
 +      last = now;
 +    }
 +  }
 +
 +  /**
 +   * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to
 +   * the tserver which will create the system user if it doesn't already exist).
 +   */
 +  private void createReadWriteDrop(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException {
 +    final String table = testName.getMethodName() + "_table";
 +    conn.tableOperations().create(table);
 +    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    Mutation m = new Mutation("a");
 +    m.put("b", "c", "d");
 +    bw.addMutation(m);
 +    bw.close();
 +    conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true));
 +    Scanner s = conn.createScanner(table, Authorizations.EMPTY);
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
 +    assertEquals("Did not find the expected key", 0, new Key("a", "b", "c").compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL));
 +    assertEquals("d", entry.getValue().toString());
 +    conn.tableOperations().delete(table);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
index dd83574,0c2631f..49160aa
--- a/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
@@@ -37,9 -35,7 +37,9 @@@ public class MasterFailoverIT extends A
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 -    cfg.setSiteConfig(Collections.singletonMap(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"));
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
-     siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
++    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 +    cfg.setSiteConfig(siteConfig);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
index 2ba6d31,b498412..d1fb9f9
--- a/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
@@@ -70,10 -66,12 +70,10 @@@ public class RestartIT extends Accumulo
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 -    Map<String,String> props = new HashMap<String,String>();
 -    props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 -    props.put(Property.GC_CYCLE_DELAY.getKey(), "1s");
 -    props.put(Property.GC_CYCLE_START.getKey(), "1s");
 -    cfg.setSiteConfig(props);
 -    cfg.useMiniDFS(true);
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1s");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    private static final ScannerOpts SOPTS = new ScannerOpts();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
index 3f7d67d,c4b3afd..68448eb
--- a/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@@ -58,10 -55,10 +58,10 @@@ public class RestartStressIT extends Ac
      opts.put(Property.TSERV_MAXMEM.getKey(), "100K");
      opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
      opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M");
-     opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
+     opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
      opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
      cfg.setSiteConfig(opts);
 -    cfg.useMiniDFS(true);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
index aec6bae,0000000..ca45382
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@@ -1,153 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.client.ConditionalWriter.Status;
 +import org.apache.accumulo.core.client.ConditionalWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Durability;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Condition;
 +import org.apache.accumulo.core.data.ConditionalMutation;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class SessionDurabilityIT extends ConfigurableMacIT {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void nondurableTableHasDurableWrites() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default has no durability
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
 +    // send durable writes
 +    BatchWriterConfig cfg = new BatchWriterConfig();
 +    cfg.setDurability(Durability.SYNC);
 +    writeSome(tableName, 10, cfg);
 +    assertEquals(10, count(tableName));
 +    // verify writes servive restart
 +    restartTServer();
 +    assertEquals(10, count(tableName));
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void durableTableLosesNonDurableWrites() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
 +    // write with no durability
 +    BatchWriterConfig cfg = new BatchWriterConfig();
 +    cfg.setDurability(Durability.NONE);
 +    writeSome(tableName, 10, cfg);
 +    // verify writes are lost on restart
 +    restartTServer();
 +    assertTrue(10 > count(tableName));
 +  }
 +
 +  private int count(String tableName) throws Exception {
 +    return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator());
 +  }
 +
 +  private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception {
 +    Connector c = getConnector();
 +    BatchWriter bw = c.createBatchWriter(tableName, cfg);
 +    for (int i = 0; i < n; i++) {
 +      Mutation m = new Mutation(i + "");
 +      m.put("", "", "");
 +      bw.addMutation(m);
 +    }
 +    bw.close();
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void testConditionDurability() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
 +    // write without durability
 +    ConditionalWriterConfig cfg = new ConditionalWriterConfig();
 +    cfg.setDurability(Durability.NONE);
 +    conditionWriteSome(tableName, 10, cfg);
 +    // everything in there?
 +    assertEquals(10, count(tableName));
 +    // restart the server and verify the updates are lost
 +    restartTServer();
 +    assertEquals(0, count(tableName));
 +  }
 +
 +  @Test(timeout = 3 * 60 * 1000)
 +  public void testConditionDurability2() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    // table default is durable writes
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
 +    // write with durability
 +    ConditionalWriterConfig cfg = new ConditionalWriterConfig();
 +    cfg.setDurability(Durability.SYNC);
 +    conditionWriteSome(tableName, 10, cfg);
 +    // everything in there?
 +    assertEquals(10, count(tableName));
 +    // restart the server and verify the updates are still there
 +    restartTServer();
 +    assertEquals(10, count(tableName));
 +  }
 +
 +  private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception {
 +    Connector c = getConnector();
 +    ConditionalWriter cw = c.createConditionalWriter(tableName, cfg);
 +    for (int i = 0; i < n; i++) {
 +      ConditionalMutation m = new ConditionalMutation((CharSequence) (i + ""), new Condition("", ""));
 +      m.put("", "", "X");
 +      assertEquals(Status.ACCEPTED, cw.write(m).getStatus());
 +    }
 +  }
 +
 +  private void restartTServer() throws Exception {
 +    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +      cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +    }
 +    cluster.start();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
index 44473b0,bfca75b..bc36257
--- a/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
@@@ -35,13 -35,15 +35,13 @@@ public class WriteAheadLogIT extends Ac
  
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 -    Map<String,String> siteConfig = new HashMap<String,String>();
 -    siteConfig.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "2M");
 -    siteConfig.put(Property.GC_CYCLE_DELAY.getKey(), "1");
 -    siteConfig.put(Property.GC_CYCLE_START.getKey(), "1");
 -    siteConfig.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
 -    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
 -    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "4s");
 -    cfg.setSiteConfig(siteConfig);
 -    cfg.useMiniDFS(true);
 +    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1");
 +    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
index f852901,f852901..fefb9a6
--- a/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
@@@ -45,7 -45,7 +45,7 @@@ public class ZookeeperRestartIT extend
    @Override
    public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
      Map<String,String> siteConfig = new HashMap<String,String>();
--    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
++    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
      cfg.setSiteConfig(siteConfig);
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index bd10f90,0000000..35bc0fe
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@@ -1,733 -1,0 +1,733 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication;
 +
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.master.replication.SequentialWorkAssigner;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
 +import org.apache.accumulo.test.functional.ConfigurableMacIT;
 +import org.apache.accumulo.tserver.TabletServer;
 +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterators;
 +
 +/**
 + * Replication tests which start at least two MAC instances and replicate data between them
 + */
 +public class MultiInstanceReplicationIT extends ConfigurableMacIT {
 +  private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class);
 +
 +  private ExecutorService executor;
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 10 * 60;
 +  }
 +
 +  @Before
 +  public void createExecutor() {
 +    executor = Executors.newSingleThreadExecutor();
 +  }
 +
 +  @After
 +  public void stopExecutor() {
 +    if (null != executor) {
 +      executor.shutdownNow();
 +    }
 +  }
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
-     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1s");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
 +    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
 +    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
 +    cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
 +    cfg.setProperty(Property.REPLICATION_NAME, "master");
 +    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
 +    cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  /**
 +   * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication
 +   */
 +  private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) {
 +    // Set the same SSL information from the primary when present
 +    Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig();
 +    if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
 +      Map<String,String> peerSiteConfig = new HashMap<String,String>();
 +      peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
 +      String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
 +      Assert.assertNotNull("Keystore Path was null", keystorePath);
 +      peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath);
 +      String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
 +      Assert.assertNotNull("Truststore Path was null", truststorePath);
 +      peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath);
 +
 +      // Passwords might be stored in CredentialProvider
 +      String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
 +      if (null != keystorePassword) {
 +        peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword);
 +      }
 +      String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
 +      if (null != truststorePassword) {
 +        peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
 +      }
 +
 +      System.out.println("Setting site configuration for peer " + peerSiteConfig);
 +      peerCfg.setSiteConfig(peerSiteConfig);
 +    }
 +
 +    // Use the CredentialProvider if the primary also uses one
 +    String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
 +    if (null != credProvider) {
 +      Map<String,String> peerSiteConfig = peerCfg.getSiteConfig();
 +      peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider);
 +      peerCfg.setSiteConfig(peerSiteConfig);
 +    }
 +  }
 +
 +  @Test
 +  public void dataWasReplicatedToThePeer() throws Exception {
 +    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
 +        ROOT_PASSWORD);
 +    peerCfg.setNumTservers(1);
 +    peerCfg.setInstanceName("peer");
 +    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
 +
 +    updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
 +
 +    MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
 +
 +    peerCluster.start();
 +
 +    try {
 +      final Connector connMaster = getConnector();
 +      final Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
 +
 +      ReplicationTable.setOnline(connMaster);
 +
 +      String peerUserName = "peer", peerPassword = "foo";
 +
 +      String peerClusterName = "peer";
 +
 +      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
 +
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
 +
 +      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
 +      connMaster.instanceOperations().setProperty(
 +          Property.REPLICATION_PEERS.getKey() + peerClusterName,
 +          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +              AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
 +
 +      final String masterTable = "master", peerTable = "peer";
 +
 +      connMaster.tableOperations().create(masterTable);
 +      String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
 +      Assert.assertNotNull(masterTableId);
 +
 +      connPeer.tableOperations().create(peerTable);
 +      String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
 +      Assert.assertNotNull(peerTableId);
 +
 +      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
 +
 +      // Replicate this table to the peerClusterName in a table with the peerTableId table id
 +      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
 +
 +      // Write some data to table1
 +      BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
 +      for (int rows = 0; rows < 5000; rows++) {
 +        Mutation m = new Mutation(Integer.toString(rows));
 +        for (int cols = 0; cols < 100; cols++) {
 +          String value = Integer.toString(cols);
 +          m.put(value, "", value);
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +
 +      log.info("Wrote all data to master cluster");
 +
 +      final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
 +
 +      log.info("Files to replicate: " + filesNeedingReplication);
 +
 +      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +        cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +      }
 +      cluster.exec(TabletServer.class);
 +
 +      log.info("TabletServer restarted");
 +      Iterators.size(ReplicationTable.getScanner(connMaster).iterator());
 +      log.info("TabletServer is online");
 +
 +      while (!ReplicationTable.isOnline(connMaster)) {
 +        log.info("Replication table still offline, waiting");
 +        Thread.sleep(5000);
 +      }
 +
 +      log.info("");
 +      log.info("Fetching metadata records:");
 +      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
 +        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
 +          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
 +        } else {
 +          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
 +        }
 +      }
 +
 +      log.info("");
 +      log.info("Fetching replication records:");
 +      for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
 +        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
 +      }
 +
 +      Future<Boolean> future = executor.submit(new Callable<Boolean>() {
 +
 +        @Override
 +        public Boolean call() throws Exception {
 +          long then = System.currentTimeMillis();
 +          connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
 +          long now = System.currentTimeMillis();
 +          log.info("Drain completed in " + (now - then) + "ms");
 +          return true;
 +        }
 +
 +      });
 +
 +      try {
 +        future.get(60, TimeUnit.SECONDS);
 +      } catch (TimeoutException e) {
 +        future.cancel(true);
 +        Assert.fail("Drain did not finish within 60 seconds");
 +      } finally {
 +        executor.shutdownNow();
 +      }
 +
 +      log.info("drain completed");
 +
 +      log.info("");
 +      log.info("Fetching metadata records:");
 +      for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
 +        if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
 +          log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
 +        } else {
 +          log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
 +        }
 +      }
 +
 +      log.info("");
 +      log.info("Fetching replication records:");
 +      for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
 +        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
 +      }
 +
 +      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
 +      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
 +      Entry<Key,Value> masterEntry = null, peerEntry = null;
 +      while (masterIter.hasNext() && peerIter.hasNext()) {
 +        masterEntry = masterIter.next();
 +        peerEntry = peerIter.next();
 +        Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
 +            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
 +        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
 +      }
 +
 +      log.info("Last master entry: " + masterEntry);
 +      log.info("Last peer entry: " + peerEntry);
 +
 +      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
 +      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
 +    } finally {
 +      peerCluster.stop();
 +    }
 +  }
 +
 +  @Test
 +  public void dataReplicatedToCorrectTable() throws Exception {
 +    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
 +        ROOT_PASSWORD);
 +    peerCfg.setNumTservers(1);
 +    peerCfg.setInstanceName("peer");
 +    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
 +
 +    updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
 +
 +    MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
 +
 +    peer1Cluster.start();
 +
 +    try {
 +      Connector connMaster = getConnector();
 +      Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
 +
 +      String peerClusterName = "peer";
 +      String peerUserName = "peer", peerPassword = "foo";
 +
 +      // Create local user
 +      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
 +
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
 +
 +      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
 +      connMaster.instanceOperations().setProperty(
 +          Property.REPLICATION_PEERS.getKey() + peerClusterName,
 +          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
 +
 +      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
 +
 +      // Create tables
 +      connMaster.tableOperations().create(masterTable1);
 +      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
 +      Assert.assertNotNull(masterTableId1);
 +
 +      connMaster.tableOperations().create(masterTable2);
 +      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
 +      Assert.assertNotNull(masterTableId2);
 +
 +      connPeer.tableOperations().create(peerTable1);
 +      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
 +      Assert.assertNotNull(peerTableId1);
 +
 +      connPeer.tableOperations().create(peerTable2);
 +      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
 +      Assert.assertNotNull(peerTableId2);
 +
 +      // Grant write permission
 +      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
 +      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
 +
 +      // Replicate this table to the peerClusterName in a table with the peerTableId table id
 +      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
 +
 +      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
 +
 +      // Write some data to table1
 +      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
 +      long masterTable1Records = 0l;
 +      for (int rows = 0; rows < 2500; rows++) {
 +        Mutation m = new Mutation(masterTable1 + rows);
 +        for (int cols = 0; cols < 100; cols++) {
 +          String value = Integer.toString(cols);
 +          m.put(value, "", value);
 +          masterTable1Records++;
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +
 +      // Write some data to table2
 +      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
 +      long masterTable2Records = 0l;
 +      for (int rows = 0; rows < 2500; rows++) {
 +        Mutation m = new Mutation(masterTable2 + rows);
 +        for (int cols = 0; cols < 100; cols++) {
 +          String value = Integer.toString(cols);
 +          m.put(value, "", value);
 +          masterTable2Records++;
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +
 +      log.info("Wrote all data to master cluster");
 +
 +      Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
 +          masterTable2);
 +
 +      log.info("Files to replicate for table1: " + filesFor1);
 +      log.info("Files to replicate for table2: " + filesFor2);
 +
 +      // Restart the tserver to force a close on the WAL
 +      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +        cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +      }
 +      cluster.exec(TabletServer.class);
 +
 +      log.info("Restarted the tserver");
 +
 +      // Read the data -- the tserver is back up and running
 +      Iterators.size(connMaster.createScanner(masterTable1, Authorizations.EMPTY).iterator());
 +
 +      while (!ReplicationTable.isOnline(connMaster)) {
 +        log.info("Replication table still offline, waiting");
 +        Thread.sleep(5000);
 +      }
 +
 +      // Wait for both tables to be replicated
 +      log.info("Waiting for {} for {}", filesFor1, masterTable1);
 +      connMaster.replicationOperations().drain(masterTable1, filesFor1);
 +
 +      log.info("Waiting for {} for {}", filesFor2, masterTable2);
 +      connMaster.replicationOperations().drain(masterTable2, filesFor2);
 +
 +      long countTable = 0l;
 +      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
 +        countTable++;
 +        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
 +            .startsWith(masterTable1));
 +      }
 +
 +      log.info("Found {} records in {}", countTable, peerTable1);
 +      Assert.assertEquals(masterTable1Records, countTable);
 +
 +      countTable = 0l;
 +      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
 +        countTable++;
 +        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
 +            .startsWith(masterTable2));
 +      }
 +
 +      log.info("Found {} records in {}", countTable, peerTable2);
 +      Assert.assertEquals(masterTable2Records, countTable);
 +
 +    } finally {
 +      peer1Cluster.stop();
 +    }
 +  }
 +
 +  @Test
 +  public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
 +    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
 +        ROOT_PASSWORD);
 +    peerCfg.setNumTservers(1);
 +    peerCfg.setInstanceName("peer");
 +    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
 +
 +    updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
 +
 +    MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
 +
 +    peerCluster.start();
 +
 +    Connector connMaster = getConnector();
 +    Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
 +
 +    String peerUserName = "repl";
 +    String peerPassword = "passwd";
 +
 +    // Create a user on the peer for replication to use
 +    connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
 +
 +    String peerClusterName = "peer";
 +
 +    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
 +    connMaster.instanceOperations().setProperty(
 +        Property.REPLICATION_PEERS.getKey() + peerClusterName,
 +        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +            AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
 +
 +    // Configure the credentials we should use to authenticate ourselves to the peer for replication
 +    connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
 +    connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
 +
 +    String masterTable = "master", peerTable = "peer";
 +
 +    connMaster.tableOperations().create(masterTable);
 +    String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
 +    Assert.assertNotNull(masterTableId);
 +
 +    connPeer.tableOperations().create(peerTable);
 +    String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
 +    Assert.assertNotNull(peerTableId);
 +
 +    // Give our replication user the ability to write to the table
 +    connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
 +
 +    // Replicate this table to the peerClusterName in a table with the peerTableId table id
 +    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
 +    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
 +
 +    // Write some data to table1
 +    BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
 +    for (int rows = 0; rows < 5000; rows++) {
 +      Mutation m = new Mutation(Integer.toString(rows));
 +      for (int cols = 0; cols < 100; cols++) {
 +        String value = Integer.toString(cols);
 +        m.put(value, "", value);
 +      }
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    log.info("Wrote all data to master cluster");
 +
 +    Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable);
 +
 +    log.info("Files to replicate:" + files);
 +
 +    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +      cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +    }
 +
 +    cluster.exec(TabletServer.class);
 +
 +    while (!ReplicationTable.isOnline(connMaster)) {
 +      log.info("Replication table still offline, waiting");
 +      Thread.sleep(5000);
 +    }
 +
 +    Iterators.size(connMaster.createScanner(masterTable, Authorizations.EMPTY).iterator());
 +
 +    for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
 +      log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
 +    }
 +
 +    connMaster.replicationOperations().drain(masterTable, files);
 +
 +    Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
 +    Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
 +    while (masterIter.hasNext() && peerIter.hasNext()) {
 +      Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next();
 +      Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
 +          masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
 +      Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
 +    }
 +
 +    Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
 +    Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
 +
 +    peerCluster.stop();
 +  }
 +
 +  @Test
 +  public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
 +    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
 +        ROOT_PASSWORD);
 +    peerCfg.setNumTservers(1);
 +    peerCfg.setInstanceName("peer");
 +    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
 +
 +    updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
 +
 +    MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
 +
 +    peer1Cluster.start();
 +
 +    try {
 +      Connector connMaster = getConnector();
 +      Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
 +
 +      String peerClusterName = "peer";
 +
 +      String peerUserName = "repl";
 +      String peerPassword = "passwd";
 +
 +      // Create a user on the peer for replication to use
 +      connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
 +
 +      // Configure the credentials we should use to authenticate ourselves to the peer for replication
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
 +      connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
 +
 +      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
 +      connMaster.instanceOperations().setProperty(
 +          Property.REPLICATION_PEERS.getKey() + peerClusterName,
 +          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
 +
 +      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
 +
 +      connMaster.tableOperations().create(masterTable1);
 +      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
 +      Assert.assertNotNull(masterTableId1);
 +
 +      connMaster.tableOperations().create(masterTable2);
 +      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
 +      Assert.assertNotNull(masterTableId2);
 +
 +      connPeer.tableOperations().create(peerTable1);
 +      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
 +      Assert.assertNotNull(peerTableId1);
 +
 +      connPeer.tableOperations().create(peerTable2);
 +      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
 +      Assert.assertNotNull(peerTableId2);
 +
 +      // Give our replication user the ability to write to the tables
 +      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
 +      connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
 +
 +      // Replicate this table to the peerClusterName in a table with the peerTableId table id
 +      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
 +
 +      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
 +
 +      // Write some data to table1
 +      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
 +      for (int rows = 0; rows < 2500; rows++) {
 +        Mutation m = new Mutation(masterTable1 + rows);
 +        for (int cols = 0; cols < 100; cols++) {
 +          String value = Integer.toString(cols);
 +          m.put(value, "", value);
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +
 +      // Write some data to table2
 +      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
 +      for (int rows = 0; rows < 2500; rows++) {
 +        Mutation m = new Mutation(masterTable2 + rows);
 +        for (int cols = 0; cols < 100; cols++) {
 +          String value = Integer.toString(cols);
 +          m.put(value, "", value);
 +        }
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +
 +      log.info("Wrote all data to master cluster");
 +
 +      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +        cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +      }
 +
 +      cluster.exec(TabletServer.class);
 +
 +      while (!ReplicationTable.isOnline(connMaster)) {
 +        log.info("Replication table still offline, waiting");
 +        Thread.sleep(5000);
 +      }
 +
 +      // Wait until we fully replicated something
 +      boolean fullyReplicated = false;
 +      for (int i = 0; i < 10 && !fullyReplicated; i++) {
 +        UtilWaitThread.sleep(2000);
 +
 +        Scanner s = ReplicationTable.getScanner(connMaster);
 +        WorkSection.limit(s);
 +        for (Entry<Key,Value> entry : s) {
 +          Status status = Status.parseFrom(entry.getValue().get());
 +          if (StatusUtil.isFullyReplicated(status)) {
 +            fullyReplicated |= true;
 +          }
 +        }
 +      }
 +
 +      Assert.assertNotEquals(0, fullyReplicated);
 +
 +      // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it
 +      // Be cautious in how quickly we assert that the data is present on the peer
 +      long countTable = 0l;
 +      for (int i = 0; i < 10; i++) {
 +        for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
 +          countTable++;
 +          Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
 +              .startsWith(masterTable1));
 +        }
 +
 +        log.info("Found {} records in {}", countTable, peerTable1);
 +
 +        if (0l == countTable) {
 +          Thread.sleep(5000);
 +        } else {
 +          break;
 +        }
 +      }
 +
 +      Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0);
 +
 +      // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it
 +      // Be cautious in how quickly we assert that the data is present on the peer
 +      for (int i = 0; i < 10; i++) {
 +        countTable = 0l;
 +        for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
 +          countTable++;
 +          Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
 +              .startsWith(masterTable2));
 +        }
 +
 +        log.info("Found {} records in {}", countTable, peerTable2);
 +
 +        if (0l == countTable) {
 +          Thread.sleep(5000);
 +        } else {
 +          break;
 +        }
 +      }
 +
 +      Assert.assertTrue("Found no records in " + peerTable2 + " in the peer cluster", countTable > 0);
 +
 +    } finally {
 +      peer1Cluster.stop();
 +    }
 +  }
 +}