You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:13 UTC
[27/34] accumulo git commit: ACCUMULO-3423 optimize WAL metadata
table updates
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
new file mode 100644
index 0000000..490bd7c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
+import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
+import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
+import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
+import static org.apache.accumulo.core.security.Authorizations.EMPTY;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WALSunnyDayIT extends ConfigurableMacIT {
+
+ private static final Text CF = new Text(new byte[0]);
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(GC_CYCLE_START, "0s");
+ cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
+ cfg.setProperty(TSERV_WAL_REPLICATION, "1");
+ cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ int countTrue(Collection<Boolean> bools) {
+ int result = 0;
+ for (Boolean b : bools) {
+ if (b.booleanValue())
+ result ++;
+ }
+ return result;
+ }
+
+ @Test
+ public void test() throws Exception {
+ MiniAccumuloClusterImpl mac = getCluster();
+ MiniAccumuloClusterControl control = mac.getClusterControl();
+ control.stop(GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ log.info(event.toString());
+ }
+ });
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ writeSomeData(c, tableName, 1, 1);
+
+ // wal markers are added lazily
+ Map<String,Boolean> wals = getWals(c, zoo);
+ assertEquals(wals.toString(), 1, wals.size());
+ for (Boolean b : wals.values()) {
+ assertTrue("logs should be in use", b.booleanValue());
+ }
+
+ // roll log, get a new next
+ writeSomeData(c, tableName, 1000, 50);
+ Map<String,Boolean> walsAfterRoll = getWals(c, zoo);
+ assertEquals("should have 3 WALs after roll", 2, walsAfterRoll.size());
+ assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet()));
+ assertEquals("all WALs should be in use", 2, countTrue(walsAfterRoll.values()));
+
+ // flush the tables
+ for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ UtilWaitThread.sleep(1000);
+ // rolled WAL is no longer in use, but needs to be GC'd
+ Map<String,Boolean> walsAfterflush = getWals(c, zoo);
+ assertEquals(walsAfterflush.toString(), 2, walsAfterflush.size());
+ assertEquals("inUse should be 1", 1, countTrue(walsAfterflush.values()));
+
+ // let the GC run for a little bit
+ control.start(GARBAGE_COLLECTOR);
+ UtilWaitThread.sleep(5 * 1000);
+ // make sure the unused WAL goes away
+ Map<String,Boolean> walsAfterGC = getWals(c, zoo);
+ assertEquals(walsAfterGC.toString(), 1, walsAfterGC.size());
+ control.stop(GARBAGE_COLLECTOR);
+ // restart the tserver, but don't run recovery on all tablets
+ control.stop(TABLET_SERVER);
+ // this delays recovery on the normal tables
+ assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
+ control.start(TABLET_SERVER);
+
+ // wait for the metadata table to go back online
+ getRecoveryMarkers(c);
+ // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
+ UtilWaitThread.sleep(5 * 1000);
+ Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
+ //log.debug("markers " + markers);
+ assertEquals("one tablet should have markers", 1, markers.keySet().size());
+ assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1"));
+
+ // put some data in the WAL
+ assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+ verifySomeData(c, tableName, 1000 * 50 + 1);
+ writeSomeData(c, tableName, 100, 100);
+
+ Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
+ //log.debug("wals after " + walsAfterRestart);
+ assertEquals("used WALs after restart should be 1", 1, countTrue(walsAfterRestart.values()));
+ control.start(GARBAGE_COLLECTOR);
+ UtilWaitThread.sleep(5 * 1000);
+ Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo);
+ assertEquals("wals left should be 1", 1, walsAfterRestartAndGC.size());
+ assertEquals("logs in use should be 1", 1, countTrue(walsAfterRestartAndGC.values()));
+ }
+
+ private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
+ Scanner scan = c.createScanner(tableName, EMPTY);
+ int result = Iterators.size(scan.iterator());
+ scan.close();
+ Assert.assertEquals(expected, result);
+ }
+
+ private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
+ Random rand = new Random();
+ BatchWriter bw = conn.createBatchWriter(tableName, null);
+ byte[] rowData = new byte[10];
+ byte[] cq = new byte[10];
+ byte[] value = new byte[10];
+
+ for (int r = 0; r < row; r++) {
+ rand.nextBytes(rowData);
+ Mutation m = new Mutation(rowData);
+ for (int c = 0; c < col; c++) {
+ rand.nextBytes(cq);
+ rand.nextBytes(value);
+ m.put(CF, new Text(cq), new Value(value));
+ }
+ bw.addMutation(m);
+ if (r % 100 == 0) {
+ bw.flush();
+ }
+ }
+ bw.close();
+ }
+
+ private Map<String, Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception {
+ Map<String, Boolean> result = new HashMap<>();
+ Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+ root.setRange(CurrentLogsSection.getRange());
+ Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+ meta.setRange(root.getRange());
+ Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
+ while (both.hasNext()) {
+ Entry<Key,Value> entry = both.next();
+ Text path = new Text();
+ CurrentLogsSection.getPath(entry.getKey(), path);
+ result.put(path.toString(), entry.getValue().get().length == 0);
+ }
+ String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ List<String> children = zoo.getChildren(zpath, null);
+ for (String child : children) {
+ byte[] data = zoo.getData(zpath + "/" + child, null, null);
+ result.put(new String(data), true);
+ }
+ return result;
+ }
+
+ private Map<KeyExtent, List<String>> getRecoveryMarkers(Connector c) throws Exception {
+ Map<KeyExtent, List<String>> result = new HashMap<>();
+ Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+ root.setRange(TabletsSection.getRange());
+ root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
+
+ Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+ meta.setRange(TabletsSection.getRange());
+ meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
+
+ List<String> logs = new ArrayList<>();
+ Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
+ while (both.hasNext()) {
+ Entry<Key,Value> entry = both.next();
+ Key key = entry.getKey();
+ if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
+ logs.add(key.getColumnQualifier().toString());
+ }
+ if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
+ KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
+ result.put(extent, logs);
+ logs = new ArrayList<String>();
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
index 3b1dd2f..140fd59 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@ -52,7 +52,7 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT {
String response = new String(buffer, 0, n);
long total = Long.parseLong(response.split(":")[1].trim());
assertTrue("Total watches was not greater than 500, but was " + total, total > 500);
- assertTrue("Total watches was not less than 650, but was " + total, total < 600);
+ assertTrue("Total watches was not less than 675, but was " + total, total < 675);
} finally {
socket.close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
new file mode 100644
index 0000000..fcd1fd7
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.test.continuous.ContinuousIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class RollWALPerformanceIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M");
+ cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100");
+ cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.useMiniDFS(true);
+ }
+
+ private long ingest() throws Exception {
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+
+ log.info("Creating the table");
+ c.tableOperations().create(tableName);
+
+ log.info("Splitting the table");
+ final long SPLIT_COUNT = 100;
+ final long distance = Long.MAX_VALUE / SPLIT_COUNT;
+ final SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 1; i < SPLIT_COUNT; i++) {
+ splits.add(new Text(String.format("%016x", i * distance)));
+ }
+ c.tableOperations().addSplits(tableName, splits);
+
+ log.info("Waiting for balance");
+ c.instanceOperations().waitForBalance();
+
+ final Instance inst = c.getInstance();
+
+ log.info("Starting ingest");
+ final long start = System.currentTimeMillis();
+ final String args[] = {
+ "-i", inst.getInstanceName(),
+ "-z", inst.getZooKeepers(),
+ "-u", "root",
+ "-p", ROOT_PASSWORD,
+ "--batchThreads", "2",
+ "--table", tableName,
+ "--num", Long.toString(1000*1000), // 1M 100 byte entries
+ };
+
+ ContinuousIngest.main(args);
+ final long result = System.currentTimeMillis() - start;
+ log.debug(String.format("Finished in %,d ms", result));
+ log.debug("Dropping table");
+ c.tableOperations().delete(tableName);
+ return result;
+ }
+
+ private long getAverage() throws Exception {
+ final int REPEAT = 3;
+ long totalTime = 0;
+ for (int i = 0; i < REPEAT; i++) {
+ totalTime += ingest();
+ }
+ return totalTime / REPEAT;
+ }
+
+ private void testWalPerformanceOnce() throws Exception {
+ // get time with a small WAL, which will cause many WAL roll-overs
+ long avg1 = getAverage();
+ // use a bigger WAL max size to eliminate WAL roll-overs
+ Connector c = getConnector();
+ c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G");
+ c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ c.tableOperations().flush(RootTable.NAME, null, null, true);
+ for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
+ getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+ }
+ getCluster().start();
+ long avg2 = getAverage();
+ log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2));
+ assertTrue(avg1 > avg2);
+ double percent = (100. * avg1) / avg2;
+ log.info(String.format("Percent of large log: %.2f%%", percent));
+ assertTrue(percent < 125.);
+ }
+
+ @Test(timeout= 20 * 60 * 1000)
+ public void testWalPerformance() throws Exception {
+ testWalPerformanceOnce();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 75f61f1..62ed9c2 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -78,6 +79,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
// Wait longer to try to let the replication table come online before a cycle runs
cfg.setProperty(Property.GC_CYCLE_START, "10s");
@@ -102,18 +104,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- Range r = MetadataSchema.TabletsSection.getRange(tableId);
- s.setRange(r);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ s.setRange(CurrentLogsSection.getRange());
+ s.fetchColumnFamily(CurrentLogsSection.COLF);
Set<String> wals = new HashSet<String>();
for (Entry<Key,Value> entry : s) {
log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
// hostname:port/uri://path/to/wal
- String cq = entry.getKey().getColumnQualifier().toString();
- int index = cq.indexOf('/');
- // Normalize the path
- String path = new Path(cq.substring(index + 1)).toString();
+ String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
log.debug("Extracted file: " + path);
wals.add(path);
}
@@ -228,11 +226,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
- log.info("Checking to see that log entries are removed from tablet section after MinC");
- // After compaction, the log column should be gone from the tablet
- Set<String> walsAfterMinc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
Set<String> filesForTable = getFilesForTable(table);
Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
log.info("Files for table before MajC: {}", filesForTable);
@@ -258,14 +251,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
fileExists = fs.exists(fileToBeDeleted);
}
- // At this point in time, we *know* that the GarbageCollector has run which means that the Status
- // for our WAL should not be altered.
-
- log.info("Re-checking that WALs are still not referenced for our table");
-
- Set<String> walsAfterMajc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
@@ -326,11 +311,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
- log.info("Checking to see that log entries are removed from tablet section after MinC");
- // After compaction, the log column should be gone from the tablet
- Set<String> walsAfterMinc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
Set<String> filesForTable = getFilesForTable(table);
Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
log.info("Files for table before MajC: {}", filesForTable);
@@ -359,11 +339,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
// At this point in time, we *know* that the GarbageCollector has run which means that the Status
// for our WAL should not be altered.
- log.info("Re-checking that WALs are still not referenced for our table");
-
- Set<String> walsAfterMajc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 9dec16e..7a017e1 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -146,7 +146,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
}
}
- @Test
+ @Test(timeout = 10 * 60 * 1000)
public void dataWasReplicatedToThePeer() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 46e3ac1..3c1cbeb 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -16,11 +16,12 @@
*/
package org.apache.accumulo.test.replication;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,6 +46,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -63,6 +65,7 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -71,7 +74,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.gc.SimpleGarbageCollector;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.server.replication.StatusCombiner;
import org.apache.accumulo.server.replication.StatusFormatter;
@@ -79,7 +82,6 @@ import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -123,25 +125,38 @@ public class ReplicationIT extends ConfigurableMacIT {
cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
cfg.setNumTservers(1);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
- Multimap<String,String> logs = HashMultimap.create();
+ // Map of server to tableId
+ Multimap<TServerInstance, String> serverToTableID = HashMultimap.create();
Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.setRange(new Range());
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
+ byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
+ serverToTableID.put(key, new String(tableId, UTF_8));
+ }
+ // Map of logs to tableId
+ Multimap<String,String> logs = HashMultimap.create();
+ scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
for (Entry<Key,Value> entry : scanner) {
if (Thread.interrupted()) {
return logs;
}
-
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- for (String log : logEntry.logSet) {
- // Need to normalize the log file from LogEntry
- logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
+ Text path = new Text();
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ Text session = new Text();
+ Text hostPort = new Text();
+ MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort , session);
+ TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString());
+ for (String tableId : serverToTableID.get(server)) {
+ logs.put(new Path(path.toString()).toString(), tableId);
}
}
return logs;
@@ -296,10 +311,12 @@ public class ReplicationIT extends ConfigurableMacIT {
attempts = 5;
while (wals.isEmpty() && attempts > 0) {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ s.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- wals.add(new Path(logEntry.filename).toString());
+ Text path = new Text();
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ wals.add(new Path(path.toString()).toString());
}
attempts--;
}
@@ -330,18 +347,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse(ReplicationTable.isOnline(conn));
for (String table : tables) {
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-
- for (int j = 0; j < 5; j++) {
- Mutation m = new Mutation(Integer.toString(j));
- for (int k = 0; k < 5; k++) {
- String value = Integer.toString(k);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table, 5, 5);
}
// After writing data, still no replication table
@@ -381,18 +387,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse(ReplicationTable.isOnline(conn));
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-
- for (int rows = 0; rows < 50; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 50, 50);
// After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1
// Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -439,18 +434,7 @@ public class ReplicationIT extends ConfigurableMacIT {
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
// Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-
- for (int rows = 0; rows < 50; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 50, 50);
// After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2
// Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -498,6 +482,19 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
}
+ private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception {
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ for (int row = 0; row < rows; row++) {
+ Mutation m = new Mutation(Integer.toString(row));
+ for (int col = 0; col < cols; col++) {
+ String value = Integer.toString(col);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
@Test
public void replicationEntriesPrecludeWalDeletion() throws Exception {
final Connector conn = getConnector();
@@ -529,53 +526,21 @@ public class ReplicationIT extends ConfigurableMacIT {
Thread.sleep(2000);
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 200, 500);
conn.tableOperations().create(table2);
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
Thread.sleep(2000);
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
conn.tableOperations().create(table3);
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
Thread.sleep(2000);
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Force a write to metadata for the data written
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -609,7 +574,8 @@ public class ReplicationIT extends ConfigurableMacIT {
// We should have *some* reference to each log that was seen in the metadata table
// They might not yet all be closed though (might be newfile)
- Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
+ Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
+ Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1);
for (String replFile : replFiles) {
Path p = new Path(replFile);
@@ -697,44 +663,11 @@ public class ReplicationIT extends ConfigurableMacIT {
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
+ writeSomeData(conn, table1, 200, 500);
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Flush everything to try to make the replication records
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -789,10 +722,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Set<String> wals = new HashSet<>();
for (Entry<Key,Value> entry : s) {
LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String file : logEntry.logSet) {
- Path p = new Path(file);
- wals.add(p.toString());
- }
+ wals.add(new Path(logEntry.filename).toString());
}
log.warn("Found wals {}", wals);
@@ -869,9 +799,7 @@ public class ReplicationIT extends ConfigurableMacIT {
public void singleTableWithSingleTarget() throws Exception {
// We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
// against expected Status messages.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
Connector conn = getConnector();
String table1 = "table1";
@@ -905,17 +833,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
// Make sure the replication table is online at this point
boolean online = ReplicationTable.isOnline(conn);
@@ -1002,17 +920,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some more data so that we over-run the single WAL
- bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 3000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 3000, 50);
log.info("Issued compaction for table");
conn.tableOperations().compact(table1, null, null, true, true);
@@ -1085,17 +993,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
conn.tableOperations().flush(table1, null, null, true);
String tableId = conn.tableOperations().tableIdMap().get(table1);
@@ -1151,10 +1049,7 @@ public class ReplicationIT extends ConfigurableMacIT {
@Test
public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
- Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
- for (ProcessReference ref : gcProcs) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
final Connector conn = getConnector();
@@ -1185,7 +1080,6 @@ public class ReplicationIT extends ConfigurableMacIT {
String table1 = "table1", table2 = "table2", table3 = "table3";
- BatchWriter bw;
try {
conn.tableOperations().create(table1);
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -1194,51 +1088,19 @@ public class ReplicationIT extends ConfigurableMacIT {
ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
// Write some data to table1
- bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 200, 500);
conn.tableOperations().create(table2);
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
conn.tableOperations().create(table3);
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Flush everything to try to make the replication records
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1252,11 +1114,8 @@ public class ReplicationIT extends ConfigurableMacIT {
// Kill the tserver(s) and restart them
// to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
-
- cluster.exec(TabletServer.class);
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
// Make sure we can read all the tables (recovery complete)
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1359,9 +1218,7 @@ public class ReplicationIT extends ConfigurableMacIT {
@Test
public void replicatedStatusEntriesAreDeleted() throws Exception {
// Just stop it now, we'll restart it after we restart the tserver
- for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
final Connector conn = getConnector();
log.info("Got connector to MAC");
@@ -1397,17 +1254,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertNotNull("Could not determine table id for " + table1, tableId);
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
conn.tableOperations().flush(table1, null, null, true);
// Make sure the replication table exists at this point
@@ -1425,14 +1272,35 @@ public class ReplicationIT extends ConfigurableMacIT {
// Grant ourselves the write permission for later
conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+ log.info("Checking for replication entries in replication");
+ // Then we need to get those records over to the replication table
+ Scanner s;
+ Set<String> entries = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ entries.clear();
+ for (Entry<Key,Value> entry : s) {
+ entries.add(entry.getKey().getRow().toString());
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+ if (!entries.isEmpty()) {
+ log.info("Replication entries {}", entries);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty());
+
// Find the WorkSection record that will be created for that data we ingested
boolean notFound = true;
- Scanner s;
for (int i = 0; i < 10 && notFound; i++) {
try {
s = ReplicationTable.getScanner(conn);
WorkSection.limit(s);
Entry<Key,Value> e = Iterables.getOnlyElement(s);
+ log.info("Found entry: " + e.getKey().toStringNoTruncate());
Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
notFound = false;
@@ -1483,14 +1351,13 @@ public class ReplicationIT extends ConfigurableMacIT {
log.info("Killing tserver");
// Kill the tserver(s) and restart them
// to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
log.info("Starting tserver");
- cluster.exec(TabletServer.class);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
log.info("Waiting to read tables");
+ UtilWaitThread.sleep(2 * 3 * 1000);
// Make sure we can read all the tables (recovery complete)
for (String table : new String[] {MetadataTable.NAME, table1}) {
@@ -1499,55 +1366,48 @@ public class ReplicationIT extends ConfigurableMacIT {
Entry<Key,Value> entry : s) {}
}
- log.info("Checking for replication entries in replication");
- // Then we need to get those records over to the replication table
- boolean foundResults = false;
- for (int i = 0; i < 5; i++) {
- s = ReplicationTable.getScanner(conn);
- int count = 0;
- for (Entry<Key,Value> entry : s) {
- count++;
- log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
- }
- if (count > 0) {
- foundResults = true;
- break;
- }
- Thread.sleep(1000);
+ log.info("Recovered metadata:");
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : s) {
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
}
- Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
- getCluster().exec(SimpleGarbageCollector.class);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
// Wait for a bit since the GC has to run (should be running after a one second delay)
waitForGCLock(conn);
Thread.sleep(1000);
+ log.info("After GC");
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : s) {
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+
// We expect no records in the metadata table after compaction. We have to poll
// because we have to wait for the StatusMaker's next iteration which will clean
// up the dangling *closed* records after we create the record in the replication table.
// We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
log.info("Checking metadata table for replication entries");
- foundResults = true;
+ Set<String> remaining = new HashSet<>();
for (int i = 0; i < 10; i++) {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
s.setRange(ReplicationSection.getRange());
- long size = 0;
+ remaining.clear();
for (Entry<Key,Value> e : s) {
- size++;
- log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+ remaining.add(e.getKey().getRow().toString());
}
- if (size == 0) {
- foundResults = false;
+ remaining.retainAll(entries);
+ if (remaining.isEmpty()) {
break;
}
+ log.info("remaining {}", remaining);
Thread.sleep(2000);
log.info("");
}
- Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+ Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty());
/**
* After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
@@ -1560,10 +1420,10 @@ public class ReplicationIT extends ConfigurableMacIT {
recordsFound = 0;
for (Entry<Key,Value> entry : s) {
recordsFound++;
- log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+ log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
}
- if (0 == recordsFound) {
+ if (recordsFound <= 2) {
break;
} else {
Thread.sleep(1000);
@@ -1571,6 +1431,6 @@ public class ReplicationIT extends ConfigurableMacIT {
}
}
- Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+ Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2);
}
}