You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:20:27 UTC
[1/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table
updates
Repository: accumulo
Updated Branches:
refs/heads/1.7 ea25e98c9 -> 55981ad88
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
new file mode 100644
index 0000000..490bd7c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
+import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
+import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
+import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
+import static org.apache.accumulo.core.security.Authorizations.EMPTY;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WALSunnyDayIT extends ConfigurableMacIT {
+
+ private static final Text CF = new Text(new byte[0]);
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(GC_CYCLE_START, "0s");
+ cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
+ cfg.setProperty(TSERV_WAL_REPLICATION, "1");
+ cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ int countTrue(Collection<Boolean> bools) {
+ int result = 0;
+ for (Boolean b : bools) {
+ if (b.booleanValue())
+ result ++;
+ }
+ return result;
+ }
+
+ @Test
+ public void test() throws Exception {
+ MiniAccumuloClusterImpl mac = getCluster();
+ MiniAccumuloClusterControl control = mac.getClusterControl();
+ control.stop(GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ log.info(event.toString());
+ }
+ });
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ writeSomeData(c, tableName, 1, 1);
+
+ // wal markers are added lazily
+ Map<String,Boolean> wals = getWals(c, zoo);
+ assertEquals(wals.toString(), 1, wals.size());
+ for (Boolean b : wals.values()) {
+ assertTrue("logs should be in use", b.booleanValue());
+ }
+
+ // roll log, get a new next
+ writeSomeData(c, tableName, 1000, 50);
+ Map<String,Boolean> walsAfterRoll = getWals(c, zoo);
+ assertEquals("should have 3 WALs after roll", 2, walsAfterRoll.size());
+ assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet()));
+ assertEquals("all WALs should be in use", 2, countTrue(walsAfterRoll.values()));
+
+ // flush the tables
+ for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ UtilWaitThread.sleep(1000);
+ // rolled WAL is no longer in use, but needs to be GC'd
+ Map<String,Boolean> walsAfterflush = getWals(c, zoo);
+ assertEquals(walsAfterflush.toString(), 2, walsAfterflush.size());
+ assertEquals("inUse should be 1", 1, countTrue(walsAfterflush.values()));
+
+ // let the GC run for a little bit
+ control.start(GARBAGE_COLLECTOR);
+ UtilWaitThread.sleep(5 * 1000);
+ // make sure the unused WAL goes away
+ Map<String,Boolean> walsAfterGC = getWals(c, zoo);
+ assertEquals(walsAfterGC.toString(), 1, walsAfterGC.size());
+ control.stop(GARBAGE_COLLECTOR);
+ // restart the tserver, but don't run recovery on all tablets
+ control.stop(TABLET_SERVER);
+ // this delays recovery on the normal tables
+ assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
+ control.start(TABLET_SERVER);
+
+ // wait for the metadata table to go back online
+ getRecoveryMarkers(c);
+ // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
+ UtilWaitThread.sleep(5 * 1000);
+ Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
+ //log.debug("markers " + markers);
+ assertEquals("one tablet should have markers", 1, markers.keySet().size());
+ assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1"));
+
+ // put some data in the WAL
+ assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+ verifySomeData(c, tableName, 1000 * 50 + 1);
+ writeSomeData(c, tableName, 100, 100);
+
+ Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
+ //log.debug("wals after " + walsAfterRestart);
+ assertEquals("used WALs after restart should be 1", 1, countTrue(walsAfterRestart.values()));
+ control.start(GARBAGE_COLLECTOR);
+ UtilWaitThread.sleep(5 * 1000);
+ Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo);
+ assertEquals("wals left should be 1", 1, walsAfterRestartAndGC.size());
+ assertEquals("logs in use should be 1", 1, countTrue(walsAfterRestartAndGC.values()));
+ }
+
+ private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
+ Scanner scan = c.createScanner(tableName, EMPTY);
+ int result = Iterators.size(scan.iterator());
+ scan.close();
+ Assert.assertEquals(expected, result);
+ }
+
+ private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
+ Random rand = new Random();
+ BatchWriter bw = conn.createBatchWriter(tableName, null);
+ byte[] rowData = new byte[10];
+ byte[] cq = new byte[10];
+ byte[] value = new byte[10];
+
+ for (int r = 0; r < row; r++) {
+ rand.nextBytes(rowData);
+ Mutation m = new Mutation(rowData);
+ for (int c = 0; c < col; c++) {
+ rand.nextBytes(cq);
+ rand.nextBytes(value);
+ m.put(CF, new Text(cq), new Value(value));
+ }
+ bw.addMutation(m);
+ if (r % 100 == 0) {
+ bw.flush();
+ }
+ }
+ bw.close();
+ }
+
+ private Map<String, Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception {
+ Map<String, Boolean> result = new HashMap<>();
+ Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+ root.setRange(CurrentLogsSection.getRange());
+ Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+ meta.setRange(root.getRange());
+ Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
+ while (both.hasNext()) {
+ Entry<Key,Value> entry = both.next();
+ Text path = new Text();
+ CurrentLogsSection.getPath(entry.getKey(), path);
+ result.put(path.toString(), entry.getValue().get().length == 0);
+ }
+ String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ List<String> children = zoo.getChildren(zpath, null);
+ for (String child : children) {
+ byte[] data = zoo.getData(zpath + "/" + child, null, null);
+ result.put(new String(data), true);
+ }
+ return result;
+ }
+
+ private Map<KeyExtent, List<String>> getRecoveryMarkers(Connector c) throws Exception {
+ Map<KeyExtent, List<String>> result = new HashMap<>();
+ Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+ root.setRange(TabletsSection.getRange());
+ root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
+
+ Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+ meta.setRange(TabletsSection.getRange());
+ meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
+
+ List<String> logs = new ArrayList<>();
+ Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
+ while (both.hasNext()) {
+ Entry<Key,Value> entry = both.next();
+ Key key = entry.getKey();
+ if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
+ logs.add(key.getColumnQualifier().toString());
+ }
+ if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
+ KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
+ result.put(extent, logs);
+ logs = new ArrayList<String>();
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
index 3b1dd2f..140fd59 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@ -52,7 +52,7 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT {
String response = new String(buffer, 0, n);
long total = Long.parseLong(response.split(":")[1].trim());
assertTrue("Total watches was not greater than 500, but was " + total, total > 500);
- assertTrue("Total watches was not less than 650, but was " + total, total < 600);
+ assertTrue("Total watches was not less than 675, but was " + total, total < 675);
} finally {
socket.close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
new file mode 100644
index 0000000..fcd1fd7
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.test.continuous.ContinuousIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class RollWALPerformanceIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M");
+ cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100");
+ cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.useMiniDFS(true);
+ }
+
+ private long ingest() throws Exception {
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+
+ log.info("Creating the table");
+ c.tableOperations().create(tableName);
+
+ log.info("Splitting the table");
+ final long SPLIT_COUNT = 100;
+ final long distance = Long.MAX_VALUE / SPLIT_COUNT;
+ final SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 1; i < SPLIT_COUNT; i++) {
+ splits.add(new Text(String.format("%016x", i * distance)));
+ }
+ c.tableOperations().addSplits(tableName, splits);
+
+ log.info("Waiting for balance");
+ c.instanceOperations().waitForBalance();
+
+ final Instance inst = c.getInstance();
+
+ log.info("Starting ingest");
+ final long start = System.currentTimeMillis();
+ final String args[] = {
+ "-i", inst.getInstanceName(),
+ "-z", inst.getZooKeepers(),
+ "-u", "root",
+ "-p", ROOT_PASSWORD,
+ "--batchThreads", "2",
+ "--table", tableName,
+ "--num", Long.toString(1000*1000), // 1M 100 byte entries
+ };
+
+ ContinuousIngest.main(args);
+ final long result = System.currentTimeMillis() - start;
+ log.debug(String.format("Finished in %,d ms", result));
+ log.debug("Dropping table");
+ c.tableOperations().delete(tableName);
+ return result;
+ }
+
+ private long getAverage() throws Exception {
+ final int REPEAT = 3;
+ long totalTime = 0;
+ for (int i = 0; i < REPEAT; i++) {
+ totalTime += ingest();
+ }
+ return totalTime / REPEAT;
+ }
+
+ private void testWalPerformanceOnce() throws Exception {
+ // get time with a small WAL, which will cause many WAL roll-overs
+ long avg1 = getAverage();
+ // use a bigger WAL max size to eliminate WAL roll-overs
+ Connector c = getConnector();
+ c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G");
+ c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ c.tableOperations().flush(RootTable.NAME, null, null, true);
+ for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
+ getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+ }
+ getCluster().start();
+ long avg2 = getAverage();
+ log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2));
+ assertTrue(avg1 > avg2);
+ double percent = (100. * avg1) / avg2;
+ log.info(String.format("Percent of large log: %.2f%%", percent));
+ assertTrue(percent < 125.);
+ }
+
+ @Test(timeout= 20 * 60 * 1000)
+ public void testWalPerformance() throws Exception {
+ testWalPerformanceOnce();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 75f61f1..62ed9c2 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -78,6 +79,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
// Wait longer to try to let the replication table come online before a cycle runs
cfg.setProperty(Property.GC_CYCLE_START, "10s");
@@ -102,18 +104,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- Range r = MetadataSchema.TabletsSection.getRange(tableId);
- s.setRange(r);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ s.setRange(CurrentLogsSection.getRange());
+ s.fetchColumnFamily(CurrentLogsSection.COLF);
Set<String> wals = new HashSet<String>();
for (Entry<Key,Value> entry : s) {
log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
// hostname:port/uri://path/to/wal
- String cq = entry.getKey().getColumnQualifier().toString();
- int index = cq.indexOf('/');
- // Normalize the path
- String path = new Path(cq.substring(index + 1)).toString();
+ String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
log.debug("Extracted file: " + path);
wals.add(path);
}
@@ -228,11 +226,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
- log.info("Checking to see that log entries are removed from tablet section after MinC");
- // After compaction, the log column should be gone from the tablet
- Set<String> walsAfterMinc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
Set<String> filesForTable = getFilesForTable(table);
Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
log.info("Files for table before MajC: {}", filesForTable);
@@ -258,14 +251,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
fileExists = fs.exists(fileToBeDeleted);
}
- // At this point in time, we *know* that the GarbageCollector has run which means that the Status
- // for our WAL should not be altered.
-
- log.info("Re-checking that WALs are still not referenced for our table");
-
- Set<String> walsAfterMajc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
@@ -326,11 +311,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
- log.info("Checking to see that log entries are removed from tablet section after MinC");
- // After compaction, the log column should be gone from the tablet
- Set<String> walsAfterMinc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
Set<String> filesForTable = getFilesForTable(table);
Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
log.info("Files for table before MajC: {}", filesForTable);
@@ -359,11 +339,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
// At this point in time, we *know* that the GarbageCollector has run which means that the Status
// for our WAL should not be altered.
- log.info("Re-checking that WALs are still not referenced for our table");
-
- Set<String> walsAfterMajc = getWalsForTable(table);
- Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 9dec16e..7a017e1 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -146,7 +146,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
}
}
- @Test
+ @Test(timeout = 10 * 60 * 1000)
public void dataWasReplicatedToThePeer() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 46e3ac1..3c1cbeb 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -16,11 +16,12 @@
*/
package org.apache.accumulo.test.replication;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,6 +46,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -63,6 +65,7 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -71,7 +74,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.gc.SimpleGarbageCollector;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.server.replication.StatusCombiner;
import org.apache.accumulo.server.replication.StatusFormatter;
@@ -79,7 +82,6 @@ import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -123,25 +125,38 @@ public class ReplicationIT extends ConfigurableMacIT {
cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
cfg.setNumTservers(1);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
- Multimap<String,String> logs = HashMultimap.create();
+ // Map of server to tableId
+ Multimap<TServerInstance, String> serverToTableID = HashMultimap.create();
Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.setRange(new Range());
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
+ byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
+ serverToTableID.put(key, new String(tableId, UTF_8));
+ }
+ // Map of logs to tableId
+ Multimap<String,String> logs = HashMultimap.create();
+ scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
for (Entry<Key,Value> entry : scanner) {
if (Thread.interrupted()) {
return logs;
}
-
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- for (String log : logEntry.logSet) {
- // Need to normalize the log file from LogEntry
- logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
+ Text path = new Text();
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ Text session = new Text();
+ Text hostPort = new Text();
+ MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort , session);
+ TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString());
+ for (String tableId : serverToTableID.get(server)) {
+ logs.put(new Path(path.toString()).toString(), tableId);
}
}
return logs;
@@ -296,10 +311,12 @@ public class ReplicationIT extends ConfigurableMacIT {
attempts = 5;
while (wals.isEmpty() && attempts > 0) {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ s.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- wals.add(new Path(logEntry.filename).toString());
+ Text path = new Text();
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ wals.add(new Path(path.toString()).toString());
}
attempts--;
}
@@ -330,18 +347,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse(ReplicationTable.isOnline(conn));
for (String table : tables) {
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-
- for (int j = 0; j < 5; j++) {
- Mutation m = new Mutation(Integer.toString(j));
- for (int k = 0; k < 5; k++) {
- String value = Integer.toString(k);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table, 5, 5);
}
// After writing data, still no replication table
@@ -381,18 +387,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse(ReplicationTable.isOnline(conn));
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-
- for (int rows = 0; rows < 50; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 50, 50);
// After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1
// Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -439,18 +434,7 @@ public class ReplicationIT extends ConfigurableMacIT {
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
// Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-
- for (int rows = 0; rows < 50; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 50, 50);
// After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2
// Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -498,6 +482,19 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
}
+ private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception {
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ for (int row = 0; row < rows; row++) {
+ Mutation m = new Mutation(Integer.toString(row));
+ for (int col = 0; col < cols; col++) {
+ String value = Integer.toString(col);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
@Test
public void replicationEntriesPrecludeWalDeletion() throws Exception {
final Connector conn = getConnector();
@@ -529,53 +526,21 @@ public class ReplicationIT extends ConfigurableMacIT {
Thread.sleep(2000);
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 200, 500);
conn.tableOperations().create(table2);
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
Thread.sleep(2000);
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
conn.tableOperations().create(table3);
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
Thread.sleep(2000);
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Force a write to metadata for the data written
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -609,7 +574,8 @@ public class ReplicationIT extends ConfigurableMacIT {
// We should have *some* reference to each log that was seen in the metadata table
// They might not yet all be closed though (might be newfile)
- Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
+ Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
+ Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1);
for (String replFile : replFiles) {
Path p = new Path(replFile);
@@ -697,44 +663,11 @@ public class ReplicationIT extends ConfigurableMacIT {
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
+ writeSomeData(conn, table1, 200, 500);
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Flush everything to try to make the replication records
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -789,10 +722,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Set<String> wals = new HashSet<>();
for (Entry<Key,Value> entry : s) {
LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String file : logEntry.logSet) {
- Path p = new Path(file);
- wals.add(p.toString());
- }
+ wals.add(new Path(logEntry.filename).toString());
}
log.warn("Found wals {}", wals);
@@ -869,9 +799,7 @@ public class ReplicationIT extends ConfigurableMacIT {
public void singleTableWithSingleTarget() throws Exception {
// We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
// against expected Status messages.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
Connector conn = getConnector();
String table1 = "table1";
@@ -905,17 +833,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
// Make sure the replication table is online at this point
boolean online = ReplicationTable.isOnline(conn);
@@ -1002,17 +920,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some more data so that we over-run the single WAL
- bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 3000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 3000, 50);
log.info("Issued compaction for table");
conn.tableOperations().compact(table1, null, null, true, true);
@@ -1085,17 +993,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
conn.tableOperations().flush(table1, null, null, true);
String tableId = conn.tableOperations().tableIdMap().get(table1);
@@ -1151,10 +1049,7 @@ public class ReplicationIT extends ConfigurableMacIT {
@Test
public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
- Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
- for (ProcessReference ref : gcProcs) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
final Connector conn = getConnector();
@@ -1185,7 +1080,6 @@ public class ReplicationIT extends ConfigurableMacIT {
String table1 = "table1", table2 = "table2", table3 = "table3";
- BatchWriter bw;
try {
conn.tableOperations().create(table1);
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -1194,51 +1088,19 @@ public class ReplicationIT extends ConfigurableMacIT {
ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
// Write some data to table1
- bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 200, 500);
conn.tableOperations().create(table2);
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table2, 200, 500);
conn.tableOperations().create(table3);
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table3, 200, 500);
// Flush everything to try to make the replication records
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1252,11 +1114,8 @@ public class ReplicationIT extends ConfigurableMacIT {
// Kill the tserver(s) and restart them
// to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
-
- cluster.exec(TabletServer.class);
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
// Make sure we can read all the tables (recovery complete)
for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1359,9 +1218,7 @@ public class ReplicationIT extends ConfigurableMacIT {
@Test
public void replicatedStatusEntriesAreDeleted() throws Exception {
// Just stop it now, we'll restart it after we restart the tserver
- for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
final Connector conn = getConnector();
log.info("Got connector to MAC");
@@ -1397,17 +1254,7 @@ public class ReplicationIT extends ConfigurableMacIT {
Assert.assertNotNull("Could not determine table id for " + table1, tableId);
// Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
+ writeSomeData(conn, table1, 2000, 50);
conn.tableOperations().flush(table1, null, null, true);
// Make sure the replication table exists at this point
@@ -1425,14 +1272,35 @@ public class ReplicationIT extends ConfigurableMacIT {
// Grant ourselves the write permission for later
conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+ log.info("Checking for replication entries in replication");
+ // Then we need to get those records over to the replication table
+ Scanner s;
+ Set<String> entries = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ entries.clear();
+ for (Entry<Key,Value> entry : s) {
+ entries.add(entry.getKey().getRow().toString());
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+ if (!entries.isEmpty()) {
+ log.info("Replication entries {}", entries);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty());
+
// Find the WorkSection record that will be created for that data we ingested
boolean notFound = true;
- Scanner s;
for (int i = 0; i < 10 && notFound; i++) {
try {
s = ReplicationTable.getScanner(conn);
WorkSection.limit(s);
Entry<Key,Value> e = Iterables.getOnlyElement(s);
+ log.info("Found entry: " + e.getKey().toStringNoTruncate());
Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
notFound = false;
@@ -1483,14 +1351,13 @@ public class ReplicationIT extends ConfigurableMacIT {
log.info("Killing tserver");
// Kill the tserver(s) and restart them
// to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
log.info("Starting tserver");
- cluster.exec(TabletServer.class);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
log.info("Waiting to read tables");
+ UtilWaitThread.sleep(2 * 3 * 1000);
// Make sure we can read all the tables (recovery complete)
for (String table : new String[] {MetadataTable.NAME, table1}) {
@@ -1499,55 +1366,48 @@ public class ReplicationIT extends ConfigurableMacIT {
Entry<Key,Value> entry : s) {}
}
- log.info("Checking for replication entries in replication");
- // Then we need to get those records over to the replication table
- boolean foundResults = false;
- for (int i = 0; i < 5; i++) {
- s = ReplicationTable.getScanner(conn);
- int count = 0;
- for (Entry<Key,Value> entry : s) {
- count++;
- log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
- }
- if (count > 0) {
- foundResults = true;
- break;
- }
- Thread.sleep(1000);
+ log.info("Recovered metadata:");
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : s) {
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
}
- Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
- getCluster().exec(SimpleGarbageCollector.class);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
// Wait for a bit since the GC has to run (should be running after a one second delay)
waitForGCLock(conn);
Thread.sleep(1000);
+ log.info("After GC");
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : s) {
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+
// We expect no records in the metadata table after compaction. We have to poll
// because we have to wait for the StatusMaker's next iteration which will clean
// up the dangling *closed* records after we create the record in the replication table.
// We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
log.info("Checking metadata table for replication entries");
- foundResults = true;
+ Set<String> remaining = new HashSet<>();
for (int i = 0; i < 10; i++) {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
s.setRange(ReplicationSection.getRange());
- long size = 0;
+ remaining.clear();
for (Entry<Key,Value> e : s) {
- size++;
- log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+ remaining.add(e.getKey().getRow().toString());
}
- if (size == 0) {
- foundResults = false;
+ remaining.retainAll(entries);
+ if (remaining.isEmpty()) {
break;
}
+ log.info("remaining {}", remaining);
Thread.sleep(2000);
log.info("");
}
- Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+ Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty());
/**
* After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
@@ -1560,10 +1420,10 @@ public class ReplicationIT extends ConfigurableMacIT {
recordsFound = 0;
for (Entry<Key,Value> entry : s) {
recordsFound++;
- log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+ log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
}
- if (0 == recordsFound) {
+ if (recordsFound <= 2) {
break;
} else {
Thread.sleep(1000);
@@ -1571,6 +1431,6 @@ public class ReplicationIT extends ConfigurableMacIT {
}
}
- Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+ Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2);
}
}
[4/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table
updates
Posted by ec...@apache.org.
ACCUMULO-3423 optimize WAL metadata table updates
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fdd29f5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fdd29f5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fdd29f5
Branch: refs/heads/1.7
Commit: 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87
Parents: ea25e98
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Apr 24 18:15:05 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Apr 24 18:18:56 2015 -0400
----------------------------------------------------------------------
.../client/impl/ReplicationOperationsImpl.java | 4 +-
.../org/apache/accumulo/core/conf/Property.java | 4 +-
.../accumulo/core/metadata/RootTable.java | 1 +
.../core/metadata/schema/MetadataSchema.java | 48 ++
.../core/tabletserver/log/LogEntry.java | 78 ++-
.../core/metadata/MetadataTableSchemaTest.java | 47 ++
.../org/apache/accumulo/server/TabletLevel.java | 34 ++
.../apache/accumulo/server/fs/VolumeUtil.java | 22 +-
.../apache/accumulo/server/init/Initialize.java | 1 +
.../server/master/state/MetaDataStateStore.java | 47 +-
.../master/state/MetaDataTableScanner.java | 6 +-
.../master/state/TabletLocationState.java | 7 +
.../server/master/state/TabletStateStore.java | 16 +-
.../master/state/ZooTabletStateStore.java | 35 +-
.../accumulo/server/replication/StatusUtil.java | 13 +
.../accumulo/server/util/ListVolumesUsed.java | 18 +-
.../server/util/MasterMetadataUtil.java | 18 +-
.../accumulo/server/util/MetadataTableUtil.java | 239 +++++---
.../server/util/ReplicationTableUtil.java | 13 +-
.../server/util/ReplicationTableUtilTest.java | 2 +-
.../gc/GarbageCollectWriteAheadLogs.java | 499 +++++++---------
.../accumulo/gc/SimpleGarbageCollector.java | 1 -
.../CloseWriteAheadLogReferences.java | 23 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 567 -------------------
.../CloseWriteAheadLogReferencesTest.java | 151 +----
.../java/org/apache/accumulo/master/Master.java | 3 +
.../master/MasterClientServiceHandler.java | 3 +-
.../accumulo/master/TabletGroupWatcher.java | 37 +-
.../accumulo/master/replication/WorkMaker.java | 1 +
.../accumulo/master/state/MergeStats.java | 3 +-
.../master/ReplicationOperationsImplTest.java | 9 +-
.../apache/accumulo/master/TestMergeState.java | 2 +-
.../master/state/RootTabletStateStoreTest.java | 4 +-
.../src/main/findbugs/exclude-filter.xml | 2 +-
.../server/GarbageCollectionLogger.java | 3 +-
.../apache/accumulo/tserver/TabletServer.java | 182 +++---
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +-
.../accumulo/tserver/log/SortedLogRecovery.java | 8 +-
.../tserver/log/TabletServerLogger.java | 187 +++---
.../accumulo/tserver/tablet/CommitSession.java | 3 +-
.../tserver/tablet/DatafileManager.java | 4 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 59 +-
.../tserver/tablet/TabletCommitter.java | 3 +-
.../accumulo/tserver/log/LogEntryTest.java | 56 ++
.../test/performance/thrift/NullTserver.java | 6 +-
.../accumulo/proxy/ProxyDurabilityIT.java | 9 +-
.../test/BadDeleteMarkersCreatedIT.java | 2 +-
.../org/apache/accumulo/test/BalanceIT.java | 20 +-
.../org/apache/accumulo/test/CleanWalIT.java | 1 +
.../accumulo/test/ConditionalWriterIT.java | 1 +
.../accumulo/test/GarbageCollectWALIT.java | 81 +++
.../MissingWalHeaderCompletesRecoveryIT.java | 14 +-
.../accumulo/test/NoMutationRecoveryIT.java | 178 ------
.../org/apache/accumulo/test/ShellServerIT.java | 2 +-
.../org/apache/accumulo/test/UnusedWALIT.java | 144 +++++
.../java/org/apache/accumulo/test/VolumeIT.java | 17 +
.../accumulo/test/functional/ReadWriteIT.java | 8 +
.../accumulo/test/functional/WALSunnyDayIT.java | 250 ++++++++
.../test/functional/WatchTheWatchCountIT.java | 2 +-
.../test/performance/RollWALPerformanceIT.java | 126 +++++
...bageCollectorCommunicatesWithTServersIT.java | 35 +-
.../replication/MultiInstanceReplicationIT.java | 2 +-
.../test/replication/ReplicationIT.java | 370 ++++--------
63 files changed, 1857 insertions(+), 1888 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 6a5c74a..925877d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -153,9 +153,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
try {
for (Entry<Key,Value> entry : metaBs) {
LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- wals.add(new Path(log).toString());
- }
+ wals.add(new Path(logEntry.filename).toString());
}
} finally {
metaBs.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 429abad..a5bef0a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -345,8 +345,8 @@ public enum Property {
+ "no longer in use are removed from the filesystem."),
GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port for the garbage collector's monitor service"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
- GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
- GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
+ GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured."),
+ GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting."),
GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
// properties that are specific to the monitor server behavior
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index 292ba3b..97d73d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -41,6 +41,7 @@ public class RootTable {
public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET + "/future_location";
public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET + "/lastlocation";
public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
+ public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null, null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 6baae17..c787d6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,11 +16,14 @@
*/
package org.apache.accumulo.core.metadata.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.hadoop.io.Text;
@@ -278,4 +281,49 @@ public class MetadataSchema {
buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
}
}
+
+ /**
+ * Holds references to the WALs in use in a live Tablet Server.
+ * <p>
+ * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL [] -></code>
+ */
+ public static class CurrentLogsSection {
+ private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
+ private static byte LEFT_BRACKET = (byte)'[';
+ public static final Text COLF = new Text("log");
+ public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
+
+ public static Range getRange() {
+ return section.getRange();
+ }
+
+ public static String getRowPrefix() {
+ return section.getRowPrefix();
+ }
+
+ public static void getTabletServer(Key k, Text hostPort, Text session) {
+ Preconditions.checkNotNull(k);
+ Preconditions.checkNotNull(hostPort);
+ Preconditions.checkNotNull(session);
+
+ Text row = new Text();
+ k.getRow(row);
+ if (!row.toString().startsWith(section.getRowPrefix())) {
+ throw new IllegalArgumentException("Bad key " + k.toString());
+ }
+ for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
+ if (row.charAt(sessionStart) == LEFT_BRACKET) {
+ hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
+ session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Bad key " + k.toString());
+ }
+
+ public static void getPath(Key k, Text path) {
+ k.getColumnQualifier(path);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 7fe61d1..ab70bb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -16,10 +16,10 @@
*/
package org.apache.accumulo.core.tabletserver.log;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -29,30 +29,29 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
-import com.google.common.base.Joiner;
-
public class LogEntry {
- public KeyExtent extent;
- public long timestamp;
- public String server;
- public String filename;
- public int tabletId;
- public Collection<String> logSet;
-
- public LogEntry() {}
+ public final KeyExtent extent;
+ public final long timestamp;
+ public final String server;
+ public final String filename;
public LogEntry(LogEntry le) {
this.extent = le.extent;
this.timestamp = le.timestamp;
this.server = le.server;
this.filename = le.filename;
- this.tabletId = le.tabletId;
- this.logSet = new ArrayList<String>(le.logSet);
+ }
+
+ public LogEntry(KeyExtent extent, long timestamp, String server, String filename) {
+ this.extent = extent;
+ this.timestamp = timestamp;
+ this.server = server;
+ this.filename = filename;
}
@Override
public String toString() {
- return extent.toString() + " " + filename + " (" + tabletId + ")";
+ return extent.toString() + " " + filename;
}
public String getName() {
@@ -65,43 +64,35 @@ public class LogEntry {
out.writeLong(timestamp);
out.writeUTF(server);
out.writeUTF(filename);
- out.write(tabletId);
- out.write(logSet.size());
- for (String s : logSet) {
- out.writeUTF(s);
- }
return Arrays.copyOf(out.getData(), out.getLength());
}
- public void fromBytes(byte bytes[]) throws IOException {
+ static public LogEntry fromBytes(byte bytes[]) throws IOException {
DataInputBuffer inp = new DataInputBuffer();
inp.reset(bytes, bytes.length);
- extent = new KeyExtent();
+ KeyExtent extent = new KeyExtent();
extent.readFields(inp);
- timestamp = inp.readLong();
- server = inp.readUTF();
- filename = inp.readUTF();
- tabletId = inp.read();
- int count = inp.read();
- ArrayList<String> logSet = new ArrayList<String>(count);
- for (int i = 0; i < count; i++)
- logSet.add(inp.readUTF());
- this.logSet = logSet;
+ long timestamp = inp.readLong();
+ String server = inp.readUTF();
+ String filename = inp.readUTF();
+ return new LogEntry(extent, timestamp, server, filename);
}
static private final Text EMPTY_TEXT = new Text();
public static LogEntry fromKeyValue(Key key, Value value) {
- LogEntry result = new LogEntry();
- result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+ String qualifier = key.getColumnQualifier().toString();
+ if (qualifier.indexOf('/') < 1) {
+ throw new IllegalArgumentException("Bad key for log entry: " + key);
+ }
+ KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
String[] parts = key.getColumnQualifier().toString().split("/", 2);
- result.server = parts[0];
- result.filename = parts[1];
- parts = value.toString().split("\\|");
- result.tabletId = Integer.parseInt(parts[1]);
- result.logSet = Arrays.asList(parts[0].split(";"));
- result.timestamp = key.getTimestamp();
- return result;
+ String server = parts[0];
+ // handle old-style log entries that specify log sets
+ parts = value.toString().split("\\|")[0].split(";");
+ String filename = parts[parts.length - 1];
+ long timestamp = key.getTimestamp();
+ return new LogEntry(extent, timestamp, server, filename);
}
public Text getRow() {
@@ -112,11 +103,16 @@ public class LogEntry {
return MetadataSchema.TabletsSection.LogColumnFamily.NAME;
}
+ public String getUniqueID() {
+ String parts[] = filename.split("/");
+ return parts[parts.length - 1];
+ }
+
public Text getColumnQualifier() {
return new Text(server + "/" + filename);
}
public Value getValue() {
- return new Value((Joiner.on(";").join(logSet) + "|" + tabletId).getBytes());
+ return new Value(filename.getBytes(UTF_8));
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
new file mode 100644
index 0000000..cfe59f2
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class MetadataTableSchemaTest {
+
+ @Test
+ public void testGetTabletServer() throws Exception {
+ Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ Text hostPort = new Text();
+ Text session = new Text();
+ CurrentLogsSection.getTabletServer(key, hostPort, session);
+ assertEquals("host:43861", hostPort.toString());
+ assertEquals("14a7df0e6420003", session.toString());
+ try {
+ Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+ CurrentLogsSection.getTabletServer(bogus, hostPort, session);
+ fail("bad argument not thrown");
+ } catch (IllegalArgumentException ex) {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
new file mode 100644
index 0000000..91e5ee9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public enum TabletLevel {
+ ROOT,
+ META,
+ NORMAL;
+
+ public static TabletLevel getLevel(KeyExtent extent) {
+ if (!extent.isMeta())
+ return NORMAL;
+ if (extent.isRootTablet())
+ return ROOT;
+ return META;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index c3595cd..4722e60 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -128,15 +128,12 @@ public class VolumeUtil {
switchedPath = le.filename;
ArrayList<String> switchedLogs = new ArrayList<String>();
- for (String log : le.logSet) {
- String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
- if (switchedLog != null) {
- switchedLogs.add(switchedLog);
- numSwitched++;
- } else {
- switchedLogs.add(log);
- }
-
+ String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
+ if (switchedLog != null) {
+ switchedLogs.add(switchedLog);
+ numSwitched++;
+ } else {
+ switchedLogs.add(le.filename);
}
if (numSwitched == 0) {
@@ -144,9 +141,7 @@ public class VolumeUtil {
return null;
}
- LogEntry newLogEntry = new LogEntry(le);
- newLogEntry.filename = switchedPath;
- newLogEntry.logSet = switchedLogs;
+ LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server, switchedPath);
log.trace("Switched " + le + " to " + newLogEntry);
@@ -244,7 +239,7 @@ public class VolumeUtil {
log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
// Before deleting these logs, we need to mark them for replication
for (LogEntry logEntry : logsToRemove) {
- ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet, status);
+ ReplicationTableUtil.updateFiles(context, extent, logEntry.filename, status);
}
}
}
@@ -253,7 +248,6 @@ public class VolumeUtil {
// method this should return the exact strings that are in the metadata table
return ret;
-
}
private static String decommisionedTabletDir(AccumuloServerContext context, ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index c6f1dd8..9afb93f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -533,6 +533,7 @@ public class Initialize implements KeywordExecutable {
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+ zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 7ee6f0c..c154bd0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -17,6 +17,9 @@
package org.apache.accumulo.server.master.state;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
@@ -27,9 +30,14 @@ import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class MetaDataStateStore extends TabletStateStore {
+ private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
private static final int THREADS = 4;
private static final int LATENCY = 1000;
@@ -59,7 +67,7 @@ public class MetaDataStateStore extends TabletStateStore {
@Override
public ClosableIterator<TabletLocationState> iterator() {
- return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state);
+ return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state, targetTableName);
}
@Override
@@ -116,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore {
}
@Override
- public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
BatchWriter writer = createBatchWriter();
try {
@@ -124,6 +132,15 @@ public class MetaDataStateStore extends TabletStateStore {
Mutation m = new Mutation(tls.extent.getMetadataEntry());
if (tls.current != null) {
tls.current.clearLocation(m);
+ if (logsForDeadServers != null) {
+ List<Path> logs = logsForDeadServers.get(tls.current);
+ if (logs != null) {
+ for (Path log : logs) {
+ LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
+ m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+ }
+ }
+ }
}
if (tls.future != null) {
tls.future.clearFutureLocation(m);
@@ -145,4 +162,30 @@ public class MetaDataStateStore extends TabletStateStore {
public String name() {
return "Normal Tablets";
}
+
+ @Override
+ public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
+ BatchWriter writer = createBatchWriter();
+ try {
+ for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
+ for (Path log : entry.getValue()) {
+ m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
+ }
+ writer.addMutation(m);
+ }
+ } catch (Exception ex) {
+ log.error("Error marking logs as unused: " + logs);
+ throw new DistributedStoreException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e) {
+ throw new DistributedStoreException(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index d64c108..bec2dc4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -141,6 +141,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
boolean chopped = false;
for (Entry<Key,Value> entry : decodedRow.entrySet()) {
+
Key key = entry.getKey();
Text row = key.getRow();
Text cf = key.getColumnFamily();
@@ -173,8 +174,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
}
}
if (extent == null) {
- log.warn("No prev-row for key extent: " + decodedRow);
- return null;
+ String msg = "No prev-row for key extent " + decodedRow;
+ log.error(msg);
+ throw new BadLocationStateException(msg, k.getRow());
}
return new TabletLocationState(extent, future, current, last, walogs, chopped);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index fb30440..8116ecf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -68,6 +68,13 @@ public class TabletLocationState {
final public Collection<Collection<String>> walogs;
final public boolean chopped;
+ public TServerInstance futureOrCurrent() {
+ if (current != null) {
+ return current;
+ }
+ return future;
+ }
+
@Override
public String toString() {
return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 5413e31..acc10d8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -18,8 +18,11 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
/**
* Interface for storing information about tablet assignments. There are three implementations:
@@ -56,10 +59,12 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
*
* @param tablets
* the tablets' current information
+ * @param logsForDeadServers
+ * a cache of logs in use by servers when they died
*/
- abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+ abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException;
- public static void unassign(AccumuloServerContext context, TabletLocationState tls) throws DistributedStoreException {
+ public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
TabletStateStore store;
if (tls.extent.isRootTablet()) {
store = new ZooTabletStateStore();
@@ -68,7 +73,7 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
} else {
store = new MetaDataStateStore(context);
}
- store.unassign(Collections.singletonList(tls));
+ store.unassign(Collections.singletonList(tls), logsForDeadServers);
}
public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException {
@@ -83,4 +88,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
store.setLocations(Collections.singletonList(assignment));
}
+ /**
+ * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
+ */
+ abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<Path>> logs) throws DistributedStoreException;
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index ab99396..bce20fd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -21,12 +21,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,10 +90,9 @@ public class ZooTabletStateStore extends TabletStateStore {
for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
if (logInfo != null) {
- LogEntry logEntry = new LogEntry();
- logEntry.fromBytes(logInfo);
- logs.add(logEntry.logSet);
- log.debug("root tablet logSet " + logEntry.logSet);
+ LogEntry logEntry = LogEntry.fromBytes(logInfo);
+ logs.add(Collections.singleton(logEntry.filename));
+ log.debug("root tablet log " + logEntry.filename);
}
}
TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
@@ -161,12 +165,28 @@ public class ZooTabletStateStore extends TabletStateStore {
}
@Override
- public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
if (tablets.size() != 1)
throw new IllegalArgumentException("There is only one root tablet");
TabletLocationState tls = tablets.iterator().next();
if (tls.extent.compareTo(RootTable.EXTENT) != 0)
throw new IllegalArgumentException("You can only store the root tablet location");
+ if (logsForDeadServers != null) {
+ List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
+ if (logs != null) {
+ for (Path entry : logs) {
+ LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString());
+ byte[] value;
+ try {
+ value = logEntry.toBytes();
+ } catch (IOException ex) {
+ throw new DistributedStoreException(ex);
+ }
+ store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
+ store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString() + logEntry.getUniqueID());
+ }
+ }
+ }
store.remove(RootTable.ZROOT_TABLET_LOCATION);
store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
log.debug("unassign root tablet location");
@@ -177,4 +197,9 @@ public class ZooTabletStateStore extends TabletStateStore {
return "Root Table";
}
+ @Override
+ public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
+ // the root table is not replicated, so unassigning the root tablet has removed the current log marker
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index 898e3d4..d72eea2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -153,6 +153,19 @@ public class StatusUtil {
/**
* @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
*/
+ public static Status openWithUnknownLength(long timeCreated) {
+ Builder builder = Status.newBuilder();
+ builder.setBegin(0);
+ builder.setEnd(0);
+ builder.setInfiniteEnd(true);
+ builder.setClosed(false);
+ builder.setCreatedTime(timeCreated);
+ return builder.build();
+ }
+
+ /**
+ * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+ */
public static Status openWithUnknownLength() {
return INF_END_REPLICATION_STATUS;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index e90d1dd..9e3fc7d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
/**
*
@@ -61,9 +62,6 @@ public class ListVolumesUsed {
private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
volumes.add(getLogURI(logEntry.filename));
- for (String logSet : logEntry.logSet) {
- volumes.add(getLogURI(logSet));
- }
}
private static void listZookeeper() throws Exception {
@@ -123,6 +121,20 @@ public class ListVolumesUsed {
for (String volume : volumes)
System.out.println("\tVolume : " + volume);
+
+ volumes.clear();
+ scanner.clearColumns();
+ scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ Text path = new Text();
+ for (Entry<Key,Value> entry : scanner) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ volumes.add(getLogURI(path.toString()));
+ }
+
+ System.out.println("Listing volumes referenced in " + name + " current logs section");
+
+ for (String volume : volumes)
+ System.out.println("\tVolume : " + volume);
}
public static void listVolumes(ClientContext context) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 14eba68..4a5650e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -248,35 +248,27 @@ public class MasterMetadataUtil {
if (unusedWalLogs != null) {
updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
}
-
return;
}
-
Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
-
MetadataTableUtil.update(context, zooLock, m, extent);
-
}
/**
* Update the data file for the root tablet
*/
- protected static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+ private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
- // unusedWalLogs will contain the location/name of each log in a log set
- // the log set is stored under one of the log names, but not both
- // find the entry under one of the names and delete it.
String root = MetadataTableUtil.getZookeeperLogLocation();
- boolean foundEntry = false;
for (String entry : unusedWalLogs) {
String[] parts = entry.split("/");
String zpath = root + "/" + parts[parts.length - 1];
while (true) {
try {
if (zk.exists(zpath)) {
+ log.debug("Removing WAL reference for root table " + zpath);
zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
- foundEntry = true;
}
break;
} catch (KeeperException e) {
@@ -287,16 +279,15 @@ public class MasterMetadataUtil {
UtilWaitThread.sleep(1000);
}
}
- if (unusedWalLogs.size() > 0 && !foundEntry)
- log.warn("WALog entry for root tablet did not exist " + unusedWalLogs);
}
+
/**
* Create an update that updates a tablet
*
* @return A Mutation to update a tablet from the given information
*/
- protected static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+ private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
Mutation m = new Mutation(extent.getMetadataEntry());
@@ -324,6 +315,7 @@ public class MasterMetadataUtil {
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value(Long.toString(flushId).getBytes(UTF_8)));
+
return m;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5e74aac..4470c55 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -61,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -82,10 +81,12 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -121,7 +122,7 @@ public class MetadataTableUtil {
return metadataTable;
}
- private synchronized static Writer getRootTable(ClientContext context) {
+ public synchronized static Writer getRootTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer rootTable = root_tables.get(credentials);
if (rootTable == null) {
@@ -223,7 +224,7 @@ public class MetadataTableUtil {
// add before removing in case of process death
for (LogEntry logEntry : logsToAdd)
- addLogEntry(context, logEntry, zooLock);
+ addRootLogEntry(context, zooLock, logEntry);
removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
} else {
@@ -248,6 +249,35 @@ public class MetadataTableUtil {
}
}
+ private static interface ZooOperation {
+ void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
+ }
+
+ private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ op.run(zoo);
+ }
+ break;
+ } catch (Exception e) {
+ log.error("Unexpected exception {}", e.getMessage(), e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ }
+
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
@@ -447,34 +477,6 @@ public class MetadataTableUtil {
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
}
- public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
- if (entry.extent.isRootTablet()) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String[] parts = entry.filename.split("/");
- String uniqueId = parts[parts.length - 1];
- zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
- }
- break;
- } catch (KeeperException e) {
- log.error("{}", e.getMessage(), e);
- } catch (InterruptedException e) {
- log.error("{}", e.getMessage(), e);
- } catch (IOException e) {
- log.error("{}", e.getMessage(), e);
- }
- UtilWaitThread.sleep(1000);
- }
- } else {
- Mutation m = new Mutation(entry.getRow());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
- update(context, zooLock, m, entry.extent);
- }
- }
-
public static void setRootTabletDir(String dir) throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -565,22 +567,11 @@ public class MetadataTableUtil {
}
}
- Collections.sort(result, new Comparator<LogEntry>() {
- @Override
- public int compare(LogEntry o1, LogEntry o2) {
- long diff = o1.timestamp - o2.timestamp;
- if (diff < 0)
- return -1;
- if (diff > 0)
- return 1;
- return 0;
- }
- });
log.info("Returning logs " + result + " for extent " + extent);
return result;
}
- static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+ static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String root = getZookeeperLogLocation();
// there's a little race between getting the children and fetching
@@ -588,11 +579,10 @@ public class MetadataTableUtil {
while (true) {
result.clear();
for (String child : zoo.getChildren(root)) {
- LogEntry e = new LogEntry();
try {
- e.fromBytes(zoo.getData(root + "/" + child, null));
+ LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
// upgrade from !0;!0<< -> +r<<
- e.extent = RootTable.EXTENT;
+ e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
result.add(e);
} catch (KeeperException.NoNodeException ex) {
continue;
@@ -662,28 +652,23 @@ public class MetadataTableUtil {
return new LogEntryIterator(context);
}
- public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+ public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
if (extent.isRootTablet()) {
- for (LogEntry entry : logEntries) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String parts[] = entry.filename.split("/");
- zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
- }
- break;
- } catch (Exception e) {
- log.error("{}", e.getMessage(), e);
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ for (LogEntry entry : entries) {
+ String path = root + "/" + entry.getUniqueID();
+ log.debug("Removing " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
}
- UtilWaitThread.sleep(1000);
}
- }
+ });
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (LogEntry entry : logEntries) {
- m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
+ for (LogEntry entry : entries) {
+ m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
}
update(context, zooLock, m, extent);
}
@@ -1068,4 +1053,130 @@ public class MetadataTableUtil {
return tabletEntries;
}
+ public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
+ log.debug("Adding log entry " + filename);
+ if (level == TabletLevel.ROOT) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String uniqueId = filename.getName();
+ StringBuilder path = new StringBuilder(root);
+ path.append("/");
+ path.append(CurrentLogsSection.getRowPrefix());
+ path.append(tabletSession.toString());
+ path.append(uniqueId);
+ rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ } else {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
+ String tableName = MetadataTable.NAME;
+ if (level == TabletLevel.META) {
+ tableName = RootTable.NAME;
+ }
+ BatchWriter bw = null;
+ try {
+ bw = context.getConnector().createBatchWriter(tableName, null);
+ bw.addMutation(m);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (bw != null) {
+ try {
+ bw.close();
+ } catch (Exception e2) {
+ throw new RuntimeException(e2);
+ }
+ }
+ }
+ }
+ }
+
+ private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String uniqueId = filename.getName();
+ String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+ log.debug("Removing entry " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ }
+ });
+ }
+
+ public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
+ // There could be a marker at the meta and/or root level, mark them both as unused
+ try {
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Path fname : all) {
+ Text tname = new Text(fname.toString());
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+ root.addMutation(m);
+ log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+ m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+ meta.addMutation(m);
+ removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+ }
+ } finally {
+ if (root != null) {
+ root.close();
+ }
+ if (meta != null) {
+ meta.close();
+ }
+ }
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+
+ public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> logsForDeadServers)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ // already cached
+ if (logsForDeadServers.containsKey(server)) {
+ return;
+ }
+ if (extent.isRootTablet()) {
+ final List<Path> logs = new ArrayList<>();
+ retryZooKeeperUpdate(context, lock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ logs.clear();
+ for (String child : rw.getChildren(root)) {
+ logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8)));
+ }
+ }
+ });
+ logsForDeadServers.put(server, logs);
+ } else {
+ // use the correct meta table
+ String table = MetadataTable.NAME;
+ if (extent.isMeta()) {
+ table = RootTable.NAME;
+ }
+ // fetch the current logs in use, and put them in the cache
+ Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
+ scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
+ List<Path> logs = new ArrayList<>();
+ Text path = new Text();
+ for (Entry<Key,Value> entry : scanner) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
+ logs.add(new Path(path.toString()));
+ }
+ }
+ logsForDeadServers.put(server, logs);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 8e755a3..c6d5ce4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.server.util;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -176,20 +175,14 @@ public class ReplicationTableUtil {
/**
* Write replication ingest entries for each provided file with the given {@link Status}.
*/
- public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
+ public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
if (log.isDebugEnabled()) {
- log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+ log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
}
// TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
- if (files.isEmpty()) {
- return;
- }
Value v = ProtobufUtil.toValue(stat);
- for (String file : files) {
- // TODO Can preclude this addition if the extent is for a table we don't need to replicate
- update(context, createUpdateMutation(new Path(file), v, extent), extent);
- }
+ update(context, createUpdateMutation(new Path(file), v, extent), extent);
}
static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 3983bde..04a83d3 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -91,7 +91,7 @@ public class ReplicationTableUtilTest {
String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
long createdTime = System.currentTimeMillis();
- ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
+ ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
verify(writer);
[2/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table
updates
Posted by ec...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index afd3454..aeb73b4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
@@ -30,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -45,6 +45,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
@@ -147,8 +148,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.GarbageCollectionLogger;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -1440,6 +1441,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
+
@Override
public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
@@ -1500,6 +1502,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
final AssignmentHandler ah = new AssignmentHandler(extent);
// final Runnable ah = new LoggingRunnable(log, );
// Root tablet assignment must take place immediately
+
if (extent.isRootTablet()) {
new Daemon("Root Tablet Assignment") {
@Override
@@ -1692,66 +1695,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
@Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
- String myname = getClientAddressString();
- myname = myname.replace(':', '+');
- Set<String> loggers = new HashSet<String>();
- logger.getLogFiles(loggers);
- Set<String> loggerUUIDs = new HashSet<String>();
- for (String logger : loggers)
- loggerUUIDs.add(new Path(logger).getName());
-
- nextFile: for (String filename : filenames) {
- String uuid = new Path(filename).getName();
- // skip any log we're currently using
- if (loggerUUIDs.contains(uuid))
- continue nextFile;
-
- List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
- synchronized (onlineTablets) {
- onlineTabletsCopy.addAll(onlineTablets.values());
- }
- for (Tablet tablet : onlineTabletsCopy) {
- for (String current : tablet.getCurrentLogFiles()) {
- if (current.contains(uuid)) {
- log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
- continue nextFile;
- }
- }
- }
-
- try {
- Path source = new Path(filename);
- if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
- Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
- fs.mkdirs(walogArchive);
- Path dest = new Path(walogArchive, source.getName());
- log.info("Archiving walog " + source + " to " + dest);
- if (!fs.rename(source, dest))
- log.error("rename is unsuccessful");
- } else {
- log.info("Deleting walog " + filename);
- Path sourcePath = new Path(filename);
- if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath))
- && !fs.deleteRecursively(sourcePath))
- log.warn("Failed to delete walog " + source);
- for (String recovery : ServerConstants.getRecoveryDirs()) {
- Path recoveryPath = new Path(recovery, source.getName());
- try {
- if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
- log.info("Deleted any recovery log " + filename);
- } catch (FileNotFoundException ex) {
- // ignore
- }
- }
- }
- } catch (IOException e) {
- log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
- }
- }
- }
-
- @Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
try {
checkPermission(credentials, null, "getActiveCompactions");
@@ -1772,14 +1715,20 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
@Override
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
- Set<String> logs = new HashSet<String>();
- logger.getLogFiles(logs);
- return new ArrayList<String>(logs);
+ String log = logger.getLogFile();
+ return Collections.singletonList(log);
+ }
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ log.warn("Garbage collector is attempting to remove logs through the tablet server");
+ log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" +
+ "Restart your file Garbage Collector.");
}
}
private class SplitRunner implements Runnable {
- private Tablet tablet;
+ private final Tablet tablet;
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
@@ -2033,7 +1982,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
log.error("Unexpected error ", e);
}
log.debug("Unassigning " + tls);
- TabletStateStore.unassign(TabletServer.this, tls);
+ TabletStateStore.unassign(TabletServer.this, tls, null);
} catch (DistributedStoreException ex) {
log.warn("Unable to update storage", ex);
} catch (KeeperException e) {
@@ -2243,29 +2192,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
}
- public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
- if (!this.onlineTablets.containsKey(extent)) {
- log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline");
- // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs,
- // if it doesn't, we'll need to do the same recovery with the old files.
- return;
- }
-
- log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
- long now = RelativeTime.currentTimeMillis();
- List<String> logSet = new ArrayList<String>();
- for (DfsLogger log : logs)
- logSet.add(log.getFileName());
- LogEntry entry = new LogEntry();
- entry.extent = extent;
- entry.tabletId = id;
- entry.timestamp = now;
- entry.server = logs.get(0).getLogger();
- entry.filename = logs.get(0).getFileName();
- entry.logSet = logSet;
- MetadataTableUtil.addLogEntry(this, entry, getLock());
- }
-
private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName)
throws UnknownHostException {
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -2984,6 +2910,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions.incrementAndGet();
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
+ markUnusedWALs();
}
public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException {
@@ -3002,14 +2929,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
});
for (LogEntry entry : sorted) {
Path recovery = null;
- for (String log : entry.logSet) {
- Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
- finished = SortedLogState.getFinishedMarkerPath(finished);
- TabletServer.log.info("Looking for " + finished);
- if (fs.exists(finished)) {
- recovery = finished.getParent();
- break;
- }
+ Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename));
+ finished = SortedLogState.getFinishedMarkerPath(finished);
+ TabletServer.log.info("Looking for " + finished);
+ if (fs.exists(finished)) {
+ recovery = finished.getParent();
}
if (recovery == null)
throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry);
@@ -3046,7 +2970,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
}
public Collection<Tablet> getOnlineTablets() {
- return Collections.unmodifiableCollection(onlineTablets.values());
+ synchronized (onlineTablets) {
+ return new ArrayList<Tablet>(onlineTablets.values());
+ }
}
public VolumeManager getFileSystem() {
@@ -3072,4 +2998,62 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
public SecurityOperation getSecurityOperation() {
return security;
}
+
+ // avoid unnecessary redundant markings to meta
+ final ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+ final Object levelLocks[] = new Object[TabletLevel.values().length];
+ {
+ for (int i = 0; i < levelLocks.length; i++) {
+ levelLocks[i] = new Object();
+ }
+ }
+
+
+ // remove any meta entries after a rolled log is no longer referenced
+ Set<DfsLogger> closedLogs = new HashSet<>();
+
+ private void markUnusedWALs() {
+ Set<DfsLogger> candidates;
+ synchronized (closedLogs) {
+ candidates = new HashSet<>(closedLogs);
+ }
+ for (Tablet tablet : getOnlineTablets()) {
+ candidates.removeAll(tablet.getCurrentLogFiles());
+ }
+ try {
+ Set<Path> filenames = new HashSet<>();
+ for (DfsLogger candidate : candidates) {
+ filenames.add(candidate.getPath());
+ }
+ MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
+ synchronized (closedLogs) {
+ closedLogs.removeAll(candidates);
+ }
+ } catch (AccumuloException ex) {
+ log.info(ex.toString(), ex);
+ }
+ }
+
+ public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) {
+ // serialize the updates to the metadata per level: avoids updating the level more than once
+ // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
+ synchronized (levelLocks[level.ordinal()]) {
+ EnumSet<TabletLevel> set = null;
+ set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+ if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
+ log.info("Writing log marker for level " + level + " " + copy.getFileName());
+ MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level);
+ }
+ set = metadataTableLogs.get(copy);
+ set.add(level);
+ }
+ }
+
+ public void walogClosed(DfsLogger currentLog) {
+ metadataTableLogs.remove(currentLog);
+ synchronized (closedLogs) {
+ closedLogs.add(currentLog);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8512690..cd7ce08 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -72,7 +72,7 @@ import com.google.common.base.Optional;
* Wrap a connection to a logger.
*
*/
-public class DfsLogger {
+public class DfsLogger implements Comparable<DfsLogger> {
public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
@@ -371,6 +371,7 @@ public class DfsLogger {
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
+ log.debug("Address is " + address);
String logger = Joiner.on("+").join(address.split(":"));
log.debug("DfsLogger.open() begin");
@@ -463,7 +464,11 @@ public class DfsLogger {
}
public String getFileName() {
- return logPath.toString();
+ return logPath;
+ }
+
+ public Path getPath() {
+ return new Path(logPath);
}
public void close() throws IOException {
@@ -609,4 +614,9 @@ public class DfsLogger {
return Joiner.on(":").join(parts[parts.length - 2].split("[+]"));
}
+ @Override
+ public int compareTo(DfsLogger o) {
+ return getFileName().compareTo(o.getFileName());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 37882cd..ab3dea2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -180,7 +180,7 @@ public class SortedLogRecovery {
// find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
// for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
while (reader.next(key, value)) {
- // LogReader.printEntry(entry);
+ // log.debug("Event " + key.event + " tablet " + key.tablet);
if (key.event != DEFINE_TABLET)
break;
if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
@@ -209,7 +209,7 @@ public class SortedLogRecovery {
if (lastStartToFinish.compactionStatus == Status.INITIAL)
lastStartToFinish.compactionStatus = Status.COMPLETE;
if (key.seq <= lastStartToFinish.lastStart)
- throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+ throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
lastStartToFinish.update(fileno, key.seq);
// Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
@@ -218,7 +218,7 @@ public class SortedLogRecovery {
lastStartToFinish.update(-1);
} else if (key.event == COMPACTION_FINISH) {
if (key.seq <= lastStartToFinish.lastStart)
- throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+ throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
if (lastStartToFinish.compactionStatus == Status.INITIAL)
lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
@@ -249,8 +249,6 @@ public class SortedLogRecovery {
break;
if (key.tid != tid)
break;
- // log.info("Replaying " + key);
- // log.info(value);
if (key.event == MUTATION) {
mr.receive(value.mutations.get(0));
} else if (key.event == MANY_MUTATIONS) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 1d385d9..bc77ffb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -21,14 +21,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,7 +39,9 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.StatusUtil;
@@ -72,20 +76,22 @@ public class TabletServerLogger {
private final TabletServer tserver;
- // The current log set: always updated to a new set with every change of loggers
- private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
+ // The current logger
+ private DfsLogger currentLog = null;
+ private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>();
+ private ThreadPoolExecutor nextLogMaker;
- // The current generation of logSet.
- // Because multiple threads can be using a log set at one time, a log
+ // The current generation of logs.
+ // Because multiple threads can be using a log at one time, a log
// failure is likely to affect multiple threads, who will all attempt to
- // create a new logSet. This will cause many unnecessary updates to the
+ // create a new log. This will cause many unnecessary updates to the
// metadata table.
// We'll use this generational counter to determine if another thread has
- // already fetched a new logSet.
- private AtomicInteger logSetId = new AtomicInteger();
+ // already fetched a new log.
+ private final AtomicInteger logId = new AtomicInteger();
// Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
- private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
private final AtomicInteger seqGen = new AtomicInteger();
@@ -146,62 +152,66 @@ public class TabletServerLogger {
this.flushCounter = flushCounter;
}
- private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
- final int[] result = {-1};
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
+ final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>();
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
- copy.clear();
- copy.addAll(loggers);
- if (!loggers.isEmpty())
- result[0] = logSetId.get();
- return loggers.isEmpty();
+ result.set(currentLog);
+ if (currentLog != null)
+ logIdOut.set(logId.get());
+ return currentLog == null;
}
@Override
void withWriteLock() throws IOException {
try {
- createLoggers();
- copy.clear();
- copy.addAll(loggers);
- if (copy.size() > 0)
- result[0] = logSetId.get();
+ createLogger();
+ result.set(currentLog);
+ if (currentLog != null)
+ logIdOut.set(logId.get());
else
- result[0] = -1;
+ logIdOut.set(-1);
} catch (IOException e) {
log.error("Unable to create loggers", e);
}
}
});
- return result[0];
+ return result.get();
}
- public void getLogFiles(Set<String> loggersOut) {
- logSetLock.readLock().lock();
+ public String getLogFile() {
+ logIdLock.readLock().lock();
try {
- for (DfsLogger logger : loggers) {
- loggersOut.add(logger.getFileName());
- }
+ return currentLog.getFileName();
} finally {
- logSetLock.readLock().unlock();
+ logIdLock.readLock().unlock();
}
}
- synchronized private void createLoggers() throws IOException {
- if (!logSetLock.isWriteLockedByCurrentThread()) {
+ synchronized private void createLogger() throws IOException {
+ if (!logIdLock.isWriteLockedByCurrentThread()) {
throw new IllegalStateException("createLoggers should be called with write lock held!");
}
- if (loggers.size() != 0) {
- throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
+ if (currentLog != null) {
+ throw new IllegalStateException("createLoggers should not be called when current log is set");
}
try {
- DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
- alog.open(tserver.getClientAddressString());
- loggers.add(alog);
- logSetId.incrementAndGet();
- return;
+ startLogMaker();
+ Object next = nextLog.take();
+ if (next instanceof Exception) {
+ throw (Exception)next;
+ }
+ if (next instanceof DfsLogger) {
+ currentLog = (DfsLogger)next;
+ logId.incrementAndGet();
+ log.info("Using next log " + currentLog.getFileName());
+ return;
+ } else {
+ throw new RuntimeException("Error: unexpected type seen: " + next);
+ }
} catch (Exception t) {
walErrors.put(System.currentTimeMillis(), "");
if (walErrors.size() >= HALT_AFTER_ERROR_COUNT) {
@@ -211,22 +221,63 @@ public class TabletServerLogger {
}
}
+ private synchronized void startLogMaker() {
+ if (nextLogMaker != null) {
+ return;
+ }
+ nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+ nextLogMaker.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (!nextLogMaker.isShutdown()) {
+ try {
+ log.debug("Creating next WAL");
+ DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+ alog.open(tserver.getClientAddressString());
+ log.debug("Created next WAL " + alog.getFileName());
+ while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
+ log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
+ }
+ } catch (Exception t) {
+ log.error("{}", t.getMessage(), t);
+ try {
+ nextLog.offer(t, 12, TimeUnit.HOURS);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+ }
+ }
+ });
+ }
+
+ public void resetLoggers() throws IOException {
+ logIdLock.writeLock().lock();
+ try {
+ close();
+ } finally {
+ logIdLock.writeLock().unlock();
+ }
+ }
+
synchronized private void close() throws IOException {
- if (!logSetLock.isWriteLockedByCurrentThread()) {
+ if (!logIdLock.isWriteLockedByCurrentThread()) {
throw new IllegalStateException("close should be called with write lock held!");
}
try {
- for (DfsLogger logger : loggers) {
+ if (null != currentLog) {
try {
- logger.close();
+ currentLog.close();
} catch (DfsLogger.LogClosedException ex) {
// ignore
} catch (Throwable ex) {
- log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex);
+ log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
+ } finally {
+ this.tserver.walogClosed(currentLog);
}
+ currentLog = null;
+ logSizeEstimate.set(0);
}
- loggers.clear();
- logSizeEstimate.set(0);
} catch (Throwable t) {
throw new IOException(t);
}
@@ -243,7 +294,7 @@ public class TabletServerLogger {
private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
// Work very hard not to lock this during calls to the outside world
- int currentLogSet = logSetId.get();
+ int currentLogId = logId.get();
int seq = -1;
int attempt = 1;
@@ -251,20 +302,22 @@ public class TabletServerLogger {
while (!success) {
try {
// get a reference to the loggers that no other thread can touch
- ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
- currentLogSet = initializeLoggers(copy);
+ DfsLogger copy = null;
+ AtomicInteger currentId = new AtomicInteger(-1);
+ copy = initializeLoggers(currentId);
+ currentLogId = currentId.get();
// add the logger to the log set for the memory in the tablet,
// update the metadata table if we've never used this tablet
- if (currentLogSet == logSetId.get()) {
+ if (currentLogId == logId.get()) {
for (CommitSession commitSession : sessions) {
if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
try {
// Scribble out a tablet definition and then write to the metadata table
defineTablet(commitSession);
- if (currentLogSet == logSetId.get())
- tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
+ if (currentLogId == logId.get())
+ tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent()));
} finally {
commitSession.finishUpdatingLogsUsed();
}
@@ -272,39 +325,29 @@ public class TabletServerLogger {
// Need to release
KeyExtent extent = commitSession.getExtent();
if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
- Set<String> logs = new HashSet<String>();
- for (DfsLogger logger : copy) {
- logs.add(logger.getFileName());
- }
- Status status = StatusUtil.fileCreated(System.currentTimeMillis());
- log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs);
+ Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis());
+ log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName());
// Got some new WALs, note this in the metadata table
- ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status);
+ ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status);
}
}
}
}
// Make sure that the logs haven't changed out from underneath our copy
- if (currentLogSet == logSetId.get()) {
+ if (currentLogId == logId.get()) {
// write the mutation to the logs
seq = seqGen.incrementAndGet();
if (seq < 0)
throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
- ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
- for (DfsLogger wal : copy) {
- LoggerOperation lop = writer.write(wal, seq);
- if (lop != null)
- queuedOperations.add(lop);
- }
-
- for (LoggerOperation lop : queuedOperations) {
+ LoggerOperation lop = writer.write(copy, seq);
+ if (lop != null) {
lop.await();
}
// double-check: did the log set change?
- success = (currentLogSet == logSetId.get());
+ success = (currentLogId == logId.get());
}
} catch (DfsLogger.LogClosedException ex) {
log.debug("Logs closed while writing, retrying " + attempt);
@@ -319,13 +362,13 @@ public class TabletServerLogger {
// Some sort of write failure occurred. Grab the write lock and reset the logs.
// But since multiple threads will attempt it, only attempt the reset when
// the logs haven't changed.
- final int finalCurrent = currentLogSet;
+ final int finalCurrent = currentLogId;
if (!success) {
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
- return finalCurrent == logSetId.get();
+ return finalCurrent == logId.get();
}
@Override
@@ -338,7 +381,7 @@ public class TabletServerLogger {
}
// if the log gets too big, reset it .. grab the write lock first
logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
- testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+ testLockAndRun(logIdLock, new TestCallWithWriteLock() {
@Override
boolean test() {
return logSizeEstimate.get() > maxSize;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index d908f1d..dee705c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.tserver.tablet;
-import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.core.data.Mutation;
@@ -86,7 +85,7 @@ public class CommitSession {
return committer;
}
- public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+ public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) {
return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index db1b418..ab15ccc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -424,7 +424,9 @@ class DatafileManager {
if (log.isDebugEnabled()) {
log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);
}
- ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
+ for (String logFile : logFileOnly) {
+ ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength());
+ }
}
} finally {
tablet.finishClearingUnusedLogs();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 1f4625b..17864be 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -37,6 +37,7 @@ import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -200,7 +201,7 @@ public class Tablet implements TabletCommitter {
}
// stores info about user initiated major compaction that is waiting on a minor compaction to finish
- private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
+ private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
static enum CompactionState {
WAITING_TO_START, IN_PROGRESS
@@ -627,8 +628,8 @@ public class Tablet implements TabletCommitter {
// the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress.
Status status = StatusUtil.openWithUnknownLength();
for (LogEntry logEntry : logEntries) {
- log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
- ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status);
+ log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status));
+ ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status);
}
}
@@ -640,11 +641,9 @@ public class Tablet implements TabletCommitter {
}
}
// make some closed references that represent the recovered logs
- currentLogs = new HashSet<DfsLogger>();
+ currentLogs = new ConcurrentSkipListSet<DfsLogger>();
for (LogEntry logEntry : logEntries) {
- for (String log : logEntry.logSet) {
- currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
- }
+ currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString()));
}
log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries()
@@ -935,7 +934,9 @@ public class Tablet implements TabletCommitter {
long count = 0;
+ String oldName = Thread.currentThread().getName();
try {
+ Thread.currentThread().setName("Minor compacting " + this.extent);
Span span = Trace.start("write");
CompactionStats stats;
try {
@@ -966,6 +967,7 @@ public class Tablet implements TabletCommitter {
failed = true;
throw new RuntimeException(e);
} finally {
+ Thread.currentThread().setName(oldName);
try {
getTabletMemory().finalizeMinC();
} catch (Throwable t) {
@@ -990,7 +992,7 @@ public class Tablet implements TabletCommitter {
private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
CommitSession oldCommitSession = getTabletMemory().prepareForMinC();
otherLogs = currentLogs;
- currentLogs = new HashSet<DfsLogger>();
+ currentLogs = new ConcurrentSkipListSet<DfsLogger>();
FileRef mergeFile = null;
if (mincReason != MinorCompactionReason.RECOVERY) {
@@ -2374,14 +2376,11 @@ public class Tablet implements TabletCommitter {
}
}
- private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
+ private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>();
- public synchronized Set<String> getCurrentLogFiles() {
- Set<String> result = new HashSet<String>();
- for (DfsLogger log : currentLogs) {
- result.add(log.getFileName());
- }
- return result;
+ // currentLogs may be updated while a tablet is otherwise locked
+ public Set<DfsLogger> getCurrentLogFiles() {
+ return new HashSet<DfsLogger>(currentLogs);
}
Set<String> beginClearingUnusedLogs() {
@@ -2440,13 +2439,13 @@ public class Tablet implements TabletCommitter {
// this lock is basically used to synchronize writing of log info to metadata
private final ReentrantLock logLock = new ReentrantLock();
- public synchronized int getLogCount() {
+ public int getLogCount() {
return currentLogs.size();
}
// don't release the lock if this method returns true for success; instead, the caller should clean up by calling finishUpdatingLogsUsed()
@Override
- public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
+ public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
boolean releaseLock = true;
@@ -2483,28 +2482,26 @@ public class Tablet implements TabletCommitter {
int numAdded = 0;
int numContained = 0;
- for (DfsLogger logger : more) {
- if (addToOther) {
- if (otherLogs.add(logger))
- numAdded++;
+ if (addToOther) {
+ if (otherLogs.add(more))
+ numAdded++;
- if (currentLogs.contains(logger))
- numContained++;
- } else {
- if (currentLogs.add(logger))
- numAdded++;
+ if (currentLogs.contains(more))
+ numContained++;
+ } else {
+ if (currentLogs.add(more))
+ numAdded++;
- if (otherLogs.contains(logger))
- numContained++;
- }
+ if (otherLogs.contains(more))
+ numContained++;
}
- if (numAdded > 0 && numAdded != more.size()) {
+ if (numAdded > 0 && numAdded != 1) {
// expect to add all or none
throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs);
}
- if (numContained > 0 && numContained != more.size()) {
+ if (numContained > 0 && numContained != 1) {
// expect to contain all or none
throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index c7e3a66..934ce20 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.tserver.tablet;
-import java.util.Collection;
import java.util.List;
import org.apache.accumulo.core.client.Durability;
@@ -38,7 +37,7 @@ public interface TabletCommitter {
/**
* If this method returns true, the caller must call {@link #finishUpdatingLogsUsed()} to clean up
*/
- boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
+ boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish);
void finishUpdatingLogsUsed();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
new file mode 100644
index 0000000..44058d3
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class LogEntryTest {
+
+ @Test
+ public void test() throws Exception {
+ KeyExtent extent = new KeyExtent(new Text("1"), null, new Text(""));
+ long ts = 12345678L;
+ String server = "localhost:1234";
+ String filename = "default/foo";
+ LogEntry entry = new LogEntry(extent, ts, server, filename);
+ assertEquals(extent, entry.extent);
+ assertEquals(server, entry.server);
+ assertEquals(filename, entry.filename);
+ assertEquals(ts, entry.timestamp);
+ assertEquals("1<; default/foo", entry.toString());
+ assertEquals(new Text("log"), entry.getColumnFamily());
+ assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier());
+ LogEntry copy = LogEntry.fromBytes(entry.toBytes());
+ assertEquals(entry.toString(), copy.toString());
+ Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo"));
+ key.setTimestamp(ts);
+ LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue());
+ assertEquals(entry.toString(), copy2.toString());
+ assertEquals(entry.timestamp, copy2.timestamp);
+ assertEquals("foo", entry.getUniqueID());
+ assertEquals("localhost:1234/default/foo", entry.getName());
+ assertEquals(new Value("default/foo".getBytes()), entry.getValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index d0de29f..1186c68 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -202,9 +202,6 @@ public class NullTserver {
}
@Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {}
-
- @Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
return new ArrayList<ActiveCompaction>();
}
@@ -231,6 +228,9 @@ public class NullTserver {
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
return null;
}
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { }
}
static class Opts extends Help {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
index 404a8fd..81e25cc 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -60,6 +60,11 @@ import com.google.common.net.HostAndPort;
public class ProxyDurabilityIT extends ConfigurableMacIT {
@Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
@@ -111,7 +116,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
assertEquals(0, count(tableName));
ConditionalWriterOptions cfg = new ConditionalWriterOptions();
- cfg.setDurability(Durability.LOG);
+ cfg.setDurability(Durability.SYNC);
String cwriter = client.createConditionalWriter(login, tableName, cfg);
ConditionalUpdates updates = new ConditionalUpdates();
updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
@@ -120,7 +125,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
assertEquals(1, count(tableName));
restartTServer();
- assertEquals(0, count(tableName));
+ assertEquals(1, count(tableName));
proxyServer.stop();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
index 25337b2..0dcdf42 100644
--- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
@@ -54,7 +54,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT {
@Override
public int defaultTimeoutSeconds() {
- return 60;
+ return 120;
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
index f793925..8703f18 100644
--- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -20,25 +20,33 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.hadoop.io.Text;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class BalanceIT extends ConfigurableMacIT {
+public class BalanceIT extends AccumuloClusterIT {
+ private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
- @Test(timeout = 60 * 1000)
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
public void testBalance() throws Exception {
String tableName = getUniqueNames(1)[0];
Connector c = getConnector();
- System.out.println("Creating table");
+ log.info("Creating table");
c.tableOperations().create(tableName);
SortedSet<Text> splits = new TreeSet<Text>();
for (int i = 0; i < 10; i++) {
splits.add(new Text("" + i));
}
- System.out.println("Adding splits");
+ log.info("Adding splits");
c.tableOperations().addSplits(tableName, splits);
- System.out.println("Waiting for balance");
+ log.info("Waiting for balance");
c.instanceOperations().waitForBalance();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index f553be8..fcad293 100644
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -128,6 +128,7 @@ public class CleanWalIT extends AccumuloClusterIT {
private int countLogs(String tableName, Connector conn) throws TableNotFoundException {
Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
int count = 0;
for (Entry<Key,Value> entry : scanner) {
log.debug("Saw " + entry.getKey() + "=" + entry.getValue());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
index b7637a6..65be396 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1294,6 +1294,7 @@ public class ConditionalWriterIT extends AccumuloClusterIT {
conn.tableOperations().create(tableName);
DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
+ UtilWaitThread.sleep(1000);
Span root = Trace.on("traceTest");
ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
new file mode 100644
index 0000000..0324e4a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GarbageCollectWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // not yet, please
+ String tableName = getUniqueNames(1)[0];
+ cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ // count the number of WALs in the filesystem
+ assertEquals(2, countWALsInFS(cluster));
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ // let GC run
+ UtilWaitThread.sleep(3 * 5 * 1000);
+ assertEquals(2, countWALsInFS(cluster));
+ }
+
+ private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
+ int result = 0;
+ while (iterator.hasNext()) {
+ LocatedFileStatus next = iterator.next();
+ if (!next.isDirectory()) {
+ result++;
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index b78a311..27f1f69 100644
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
-import java.util.Collections;
import java.util.UUID;
import org.apache.accumulo.core.client.BatchWriter;
@@ -27,6 +26,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -127,11 +127,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
String tableId = conn.tableOperations().tableIdMap().get(tableName);
Assert.assertNotNull("Table ID was null", tableId);
- LogEntry logEntry = new LogEntry();
- logEntry.server = "127.0.0.1:12345";
- logEntry.filename = emptyWalog.toURI().toString();
- logEntry.tabletId = 10;
- logEntry.logSet = Collections.singleton(logEntry.filename);
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString());
log.info("Taking {} offline", tableName);
conn.tableOperations().offline(tableName, true);
@@ -186,11 +182,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
String tableId = conn.tableOperations().tableIdMap().get(tableName);
Assert.assertNotNull("Table ID was null", tableId);
- LogEntry logEntry = new LogEntry();
- logEntry.server = "127.0.0.1:12345";
- logEntry.filename = partialHeaderWalog.toURI().toString();
- logEntry.tabletId = 10;
- logEntry.logSet = Collections.singleton(logEntry.filename);
+ LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString());
log.info("Taking {} offline", tableName);
conn.tableOperations().offline(tableName, true);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
deleted file mode 100644
index 6a9975c..0000000
--- a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.cluster.ClusterControl;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.FunctionalTestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// Verify that a recovery of a log without any mutations removes the log reference
-public class NoMutationRecoveryIT extends AccumuloClusterIT {
- private static final Logger log = LoggerFactory.getLogger(NoMutationRecoveryIT.class);
-
- @Override
- public int defaultTimeoutSeconds() {
- return 10 * 60;
- }
-
- @Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Before
- public void takeTraceTableOffline() throws Exception {
- Connector conn = getConnector();
- if (conn.tableOperations().exists("trace")) {
- conn.tableOperations().offline("trace", true);
- }
- }
-
- @After
- public void takeTraceTableOnline() throws Exception {
- Connector conn = getConnector();
- if (conn.tableOperations().exists("trace")) {
- conn.tableOperations().online("trace", true);
- }
- }
-
- public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) {
- // comparison, without timestamp
- Key akey = a.getKey();
- Key bkey = b.getKey();
- log.info("Comparing {} to {}", akey.toStringNoTruncate(), bkey.toStringNoTruncate());
- return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue());
- }
-
- @Test
- public void test() throws Exception {
- Connector conn = getConnector();
- final String table = getUniqueNames(1)[0];
- conn.tableOperations().create(table);
- String tableId = conn.tableOperations().tableIdMap().get(table);
-
- log.info("Created {} with id {}", table, tableId);
-
- // Add a record to the table
- update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes()));
-
- // Get the WAL reference used by the table we just added the update to
- Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME);
-
- log.info("Log reference in metadata table {} {}", logRef.getKey().toStringNoTruncate(), logRef.getValue());
-
- // Flush the record to disk
- conn.tableOperations().flush(table, null, null, true);
-
- Range range = Range.prefix(tableId);
- log.info("Fetching WAL references over " + table);
- assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, range)));
-
- // Grant permission to the admin user to write to the Metadata table
- conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
-
- // Add the wal record back to the metadata table
- update(conn, MetadataTable.NAME, logRef);
-
- // Assert that we can get the bogus update back out again
- assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME)));
-
- conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
- conn.tableOperations().flush(RootTable.NAME, null, null, true);
-
- ClusterControl control = cluster.getClusterControl();
- control.stopAllServers(ServerType.TABLET_SERVER);
- control.startAllServers(ServerType.TABLET_SERVER);
-
- // Verify that we can read the original record we wrote
- Scanner s = conn.createScanner(table, Authorizations.EMPTY);
- int count = 0;
- for (Entry<Key,Value> e : s) {
- assertEquals(e.getKey().getRow().toString(), "row");
- assertEquals(e.getKey().getColumnFamily().toString(), "cf");
- assertEquals(e.getKey().getColumnQualifier().toString(), "cq");
- assertEquals(e.getValue().toString(), "value");
- count++;
- }
- assertEquals(1, count);
-
- // Verify that the bogus log reference we wrote it gone
- for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) {
- assertFalse("Unexpected found reference to bogus log entry: " + ref.getKey().toStringNoTruncate() + " " + ref.getValue(), equals(ref, logRef));
- }
- }
-
- private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception {
- Key k = logRef.getKey();
- update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue());
- }
-
- private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception {
- return getLogRefs(conn, table, new Range());
- }
-
- private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception {
- Scanner s = conn.createScanner(table, Authorizations.EMPTY);
- s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
- s.setRange(r);
- return s;
- }
-
- private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception {
- return getLogRefs(conn, table).iterator().next();
- }
-
- private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception {
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
- Mutation m = new Mutation(row);
- m.put(cf, cq, value);
- bw.addMutation(m);
- bw.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 8b37169..5c5b95d 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -349,7 +349,7 @@ public class ShellServerIT extends SharedMiniClusterIT {
ts.exec("config -t " + table2 + " -np", true, "345M", true);
ts.exec("getsplits -t " + table2, true, "row5", true);
ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true);
- ts.exec("onlinetable " + table, true);
+ ts.exec("online " + table, true);
ts.exec("deletetable -f " + table, true);
ts.exec("deletetable -f " + table2, true);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
new file mode 100644
index 0000000..03d783c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// When reviewing the changes for ACCUMULO-3423, kturner suggested
+// "tablets will now have log references that contain no data,
+// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
+// It would be useful to have an IT that will test this situation.
+public class UnusedWALIT extends ConfigurableMacIT {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ final long logSize = 1024 * 1024 * 10;
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // don't want this bad boy cleaning up walog entries
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+
+ // make two tables
+ String[] tableNames = getUniqueNames(2);
+ String bigTable = tableNames[0];
+ String lilTable = tableNames[1];
+ Connector c = getConnector();
+ c.tableOperations().create(bigTable);
+ c.tableOperations().create(lilTable);
+
+ // put some data in a log that should be replayed for both tables
+ writeSomeData(c, bigTable, 0, 10, 0, 10);
+ scanSomeData(c, bigTable, 0, 10, 0, 10);
+ writeSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ assertEquals(1, getWALCount(c));
+
+ // roll the logs by pushing data into bigTable
+ writeSomeData(c, bigTable, 0, 3000, 0, 1000);
+ assertEquals(2, getWALCount(c));
+
+ // put some data in the latest log
+ writeSomeData(c, lilTable, 1, 10, 0, 10);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+
+ // bounce the tserver
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ // wait for the metadata table to be online
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+ // check our two sets of data in different logs
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+ }
+
+ private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ Scanner s = c.createScanner(table, Authorizations.EMPTY);
+ s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
+ int row = startRow;
+ int col = startCol;
+ for (Entry<Key,Value> entry : s) {
+ assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
+ assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16));
+ if (col == startCol + colCount) {
+ col = startCol;
+ row++;
+ if (row == startRow + rowCount) {
+ break;
+ }
+ }
+ }
+ assertEquals(row, startRow + rowCount);
+ }
+
+ private int getWALCount(Connector c) throws Exception {
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(CurrentLogsSection.getRange());
+ try {
+ return Iterators.size(s.iterator());
+ } finally {
+ s.close();
+ }
+ }
+
+ private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxMemory(10 * 1024 * 1024);
+ BatchWriter bw = conn.createBatchWriter(table, config);
+ for (int r = startRow; r < startRow + rowCount; r++) {
+ Mutation m = new Mutation(Integer.toHexString(r));
+ for (int c = startCol; c < startCol + colCount; c++) {
+ m.put("", Integer.toHexString(c), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index d9b9429..e2a0e03 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -103,6 +103,7 @@ public class VolumeIT extends ConfigurableMacIT {
cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -425,6 +426,21 @@ public class VolumeIT extends ConfigurableMacIT {
Assert.fail("Unexpected volume " + path);
}
+ Text path = new Text();
+ for (String table : new String[]{RootTable.NAME, MetadataTable.NAME}) {
+ Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
+ meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
+ outer: for (Entry<Key,Value> entry : meta) {
+ MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+ for (int i = 0; i < paths.length; i++) {
+ if (path.toString().startsWith(paths[i].toString())) {
+ continue outer;
+ }
+ }
+ Assert.fail("Unexpected volume " + path);
+ }
+ }
+
// if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
// 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
@@ -435,6 +451,7 @@ public class VolumeIT extends ConfigurableMacIT {
}
Assert.assertEquals(200, sum);
+
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 099743d..1f3e600 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -72,9 +73,11 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.TestMultiTableIngest;
import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
@@ -84,6 +87,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
public class ReadWriteIT extends AccumuloClusterIT {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ }
+
private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class);
static final int ROWS = 200000;
[5/5] accumulo git commit: ACCUMULO-3488 ACCUMULO-3423 sync these two
changes
Posted by ec...@apache.org.
ACCUMULO-3488 ACCUMULO-3423 sync these two changes
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/55981ad8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/55981ad8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/55981ad8
Branch: refs/heads/1.7
Commit: 55981ad881e7b79423799a3473856b4b9d768689
Parents: 3fdd29f
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Apr 24 19:05:50 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Apr 24 19:05:50 2015 -0400
----------------------------------------------------------------------
.../base/src/main/java/org/apache/accumulo/server/TabletLevel.java | 2 +-
.../accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java | 1 -
.../test/java/org/apache/accumulo/tserver/log/LogEntryTest.java | 2 +-
.../apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/WALSunnyDayIT.java | 2 +-
.../java/org/apache/accumulo/test/replication/ReplicationIT.java | 2 +-
6 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
index 91e5ee9..c87c7e1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
@@ -16,7 +16,7 @@
*/
package org.apache.accumulo.server;
-import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.impl.KeyExtent;
public enum TabletLevel {
ROOT,
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 78a5bd5..9fcfec9 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -48,7 +48,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
index 44058d3..ee3c621 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
@@ -19,8 +19,8 @@ package org.apache.accumulo.tserver.log;
import static org.junit.Assert.assertEquals;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.hadoop.io.Text;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index 27f1f69..e315841 100644
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -26,8 +26,8 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 490bd7c..9f22466 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -40,9 +40,9 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/55981ad8/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 3c1cbeb..8bd5641 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -46,10 +46,10 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.conf.ColumnSet;
import org.apache.accumulo.core.metadata.MetadataTable;
[3/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table
updates
Posted by ec...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 1735c0d..9f537af 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,49 +28,56 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
-import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.thrift.TException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Span;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -79,8 +86,8 @@ public class GarbageCollectWriteAheadLogs {
private final AccumuloServerContext context;
private final VolumeManager fs;
-
- private boolean useTrash;
+ private final boolean useTrash;
+ private final LiveTServerSet liveServers;
/**
* Creates a new GC WAL object.
@@ -96,56 +103,37 @@ public class GarbageCollectWriteAheadLogs {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
- }
-
- /**
- * Gets the instance used by this object.
- *
- * @return instance
- */
- Instance getInstance() {
- return context.getInstance();
- }
-
- /**
- * Gets the volume manager used by this object.
- *
- * @return volume manager
- */
- VolumeManager getVolumeManager() {
- return fs;
- }
-
- /**
- * Checks if the volume manager should move files to the trash rather than delete them.
- *
- * @return true if trash is used
- */
- boolean isUsingTrash() {
- return useTrash;
+ this.liveServers = new LiveTServerSet(context, new Listener() {
+ @Override
+ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+ log.debug("New tablet servers noticed: " + added);
+ log.debug("Tablet servers removed: " + deleted);
+ }
+ });
+ liveServers.startListeningForTabletServerChanges();
}
public void collect(GCStatus status) {
- Span span = Trace.start("scanServers");
+ Span span = Trace.start("getCandidates");
try {
+ Set<TServerInstance> currentServers = liveServers.getCurrentServers();
- Map<String,Path> sortedWALogs = getSortedWALogs();
status.currentLog.started = System.currentTimeMillis();
- Map<Path,String> fileToServerMap = new HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new HashMap<String,Path>();
- int count = scanServers(fileToServerMap, nameToFileMap);
+ Map<TServerInstance, Set<Path> > candidates = new HashMap<>();
+ long count = getCurrent(candidates, currentServers);
long fileScanStop = System.currentTimeMillis();
- log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+
+ log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(),
(fileScanStop - status.currentLog.started) / 1000.));
- status.currentLog.candidates = fileToServerMap.size();
+ status.currentLog.candidates = count;
span.stop();
- span = Trace.start("removeMetadataEntries");
+ span = Trace.start("removeEntriesInUse");
try {
- count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
+ count = removeEntriesInUse(candidates, status, currentServers);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -158,7 +146,7 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeReplicationEntries");
try {
- count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
+ count = removeReplicationEntries(candidates, status);
} catch (Exception ex) {
log.error("Unable to scan replication table", ex);
return;
@@ -170,16 +158,23 @@ public class GarbageCollectWriteAheadLogs {
log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
span = Trace.start("removeFiles");
- Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
- count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
+ count = removeFiles(candidates, status);
long removeStop = System.currentTimeMillis();
- log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
+ log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+ span.stop();
+
+ span = Trace.start("removeMarkers");
+ count = removeTabletServerMarkers(candidates);
+ long removeMarkersStop = System.currentTimeMillis();
+ log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
+ span.stop();
+
+
status.currentLog.finished = removeStop;
status.lastLog = status.currentLog;
status.currentLog = new GcCycleStats();
- span.stop();
} catch (Exception e) {
log.error("exception occured while garbage collecting write ahead logs", e);
@@ -188,161 +183,101 @@ public class GarbageCollectWriteAheadLogs {
}
}
- boolean holdsLock(HostAndPort addr) {
+ private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
+ long result = 0;
try {
- String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
- List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
- return !(children == null || children.isEmpty());
- } catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (Exception ex) {
- log.debug(ex.toString(), ex);
- return true;
- }
- }
-
- private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
- for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
- if (entry.getKey().isEmpty()) {
- // old-style log entry, just remove it
- for (Path path : entry.getValue()) {
- log.debug("Removing old-style WAL " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- } else {
- HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
- if (!holdsLock(address)) {
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
for (Path path : entry.getValue()) {
- log.debug("Removing WAL for offline server " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- continue;
- } else {
- Client tserver = null;
- try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
- tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
- log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
- status.currentLog.deleted += entry.getValue().size();
- } catch (TException e) {
- log.warn("Error talking to " + address + ": " + e);
- } finally {
- if (tserver != null)
- ThriftUtil.returnClient(tserver);
+ m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
+ result++;
}
+ root.addMutation(m);
+ meta.addMutation(m);
+ }
+ } finally {
+ if (meta != null) {
+ meta.close();
+ }
+ if (root != null) {
+ root.close();
}
}
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
+ return result;
+ }
- for (Path swalog : sortedWALogs.values()) {
- log.debug("Removing sorted WAL " + swalog);
- try {
- if (!useTrash || !fs.moveToTrash(swalog)) {
- fs.deleteRecursively(swalog);
- }
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ioe) {
+ private long removeFiles(Map<TServerInstance, Set<Path> > candidates, final GCStatus status) {
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ for (Path path : entry.getValue()) {
+ log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
try {
- if (fs.exists(swalog)) {
- log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
- }
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
} catch (IOException ex) {
- log.error("Unable to check for the existence of " + swalog, ex);
+ log.error("Unable to delete wal " + path + ": " + ex);
}
}
}
-
- return 0;
+ return status.currentLog.deleted;
}
- /**
- * Converts a list of paths to their corresponding strings.
- *
- * @param paths
- * list of paths
- * @return string forms of paths
- */
- static List<String> paths2strings(List<Path> paths) {
- List<String> result = new ArrayList<String>(paths.size());
- for (Path path : paths)
- result.add(path.toString());
- return result;
+ private UUID path2uuid(Path path) {
+ return UUID.fromString(path.getName());
}
- /**
- * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
- * mapping of file names to paths is skipped.
- *
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return map of servers to lists of file paths
- */
- static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
- Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
- for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
- if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
- continue;
- ArrayList<Path> files = result.get(fileServer.getValue());
- if (files == null) {
- files = new ArrayList<Path>();
- result.put(fileServer.getValue(), files);
- }
- files.add(fileServer.getKey());
- }
- return result;
- }
-
- protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ private long removeEntriesInUse(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
InterruptedException {
- int count = 0;
- Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
-
- // For each WAL reference in the metadata table
- while (iterator.hasNext()) {
- // Each metadata reference has at least one WAL file
- for (String entry : iterator.next().logSet) {
- // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
- // the last "/" will mark a UUID file name.
- String uuid = entry.substring(entry.lastIndexOf("/") + 1);
- if (!isUUID(uuid)) {
- // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
- throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
- }
- Path pathFromNN = nameToFileMap.remove(uuid);
- if (pathFromNN != null) {
- status.currentLog.inUse++;
- sortedWALogs.remove(uuid);
- }
+ // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
- count++;
+ Map<UUID, TServerInstance> walToDeadServer = new HashMap<>();
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ for (Path file : entry.getValue()) {
+ walToDeadServer.put(path2uuid(file), entry.getKey());
+ }
+ }
+ long count = 0;
+ RootTabletStateStore root = new RootTabletStateStore(context);
+ MetaDataStateStore meta = new MetaDataStateStore(context);
+ Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+ while (states.hasNext()) {
+ count++;
+ TabletLocationState state = states.next();
+ if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
+ candidates.remove(state.current);
+ }
+ for (Collection<String> wals : state.walogs) {
+ for (String wal : wals) {
+ UUID walUUID = path2uuid(new Path(wal));
+ TServerInstance dead = walToDeadServer.get(walUUID);
+ if (dead != null) {
+ Iterator<Path> iter = candidates.get(dead).iterator();
+ while (iter.hasNext()) {
+ if (path2uuid(iter.next()).equals(walUUID)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+ }
}
}
-
return count;
}
- protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
- InterruptedException {
+ protected int removeReplicationEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
Connector conn;
try {
conn = context.getConnector();
@@ -353,21 +288,25 @@ public class GarbageCollectWriteAheadLogs {
int count = 0;
- Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+ Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
while (walIter.hasNext()) {
- Entry<String,Path> wal = walIter.next();
- String fullPath = wal.getValue().toString();
- if (neededByReplication(conn, fullPath)) {
- log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
- // If we haven't already removed it, check to see if this WAL is
- // "in use" by replication (needed for replication purposes)
- status.currentLog.inUse++;
-
+ Entry<TServerInstance,Set<Path>> wal = walIter.next();
+ Iterator<Path> paths = wal.getValue().iterator();
+ while (paths.hasNext()) {
+ Path fullPath = paths.next();
+ if (neededByReplication(conn, fullPath)) {
+ log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+ // If we haven't already removed it, check to see if this WAL is
+ // "in use" by replication (needed for replication purposes)
+ status.currentLog.inUse++;
+ paths.remove();
+ } else {
+ log.debug("WAL not needed for replication {}", fullPath);
+ }
+ }
+ if (wal.getValue().isEmpty()) {
walIter.remove();
- sortedWALogs.remove(wal.getKey());
- } else {
- log.debug("WAL not needed for replication {}", fullPath);
}
count++;
}
@@ -375,6 +314,7 @@ public class GarbageCollectWriteAheadLogs {
return count;
}
+
/**
* Determine if the given WAL is needed for replication
*
@@ -382,7 +322,7 @@ public class GarbageCollectWriteAheadLogs {
* The full path (URI)
* @return True if the WAL is still needed by replication (not a candidate for deletion)
*/
- protected boolean neededByReplication(Connector conn, String wal) {
+ protected boolean neededByReplication(Connector conn, Path wal) {
log.info("Checking replication table for " + wal);
Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
@@ -405,7 +345,7 @@ public class GarbageCollectWriteAheadLogs {
return false;
}
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
Scanner metaScanner;
try {
metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -425,7 +365,7 @@ public class GarbageCollectWriteAheadLogs {
StatusSection.limit(replScanner);
// Only look for this specific WAL
- replScanner.setRange(Range.exact(wal));
+ replScanner.setRange(Range.exact(wal.toString()));
return Iterables.concat(metaScanner, replScanner);
} catch (ReplicationTableOfflineException e) {
@@ -435,107 +375,84 @@ public class GarbageCollectWriteAheadLogs {
return metaScanner;
}
- private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
- }
-
- /**
- * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
- *
- * @param walDirs
- * write-ahead log directories
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return number of servers located (including those with no logs present)
- */
- int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- Set<String> servers = new HashSet<String>();
- for (String walDir : walDirs) {
- Path walRoot = new Path(walDir);
- FileStatus[] listing = null;
- try {
- listing = fs.listStatus(walRoot);
- } catch (FileNotFoundException e) {
- // ignore dir
- }
- if (listing == null)
- continue;
- for (FileStatus status : listing) {
- String server = status.getPath().getName();
- if (status.isDirectory()) {
- servers.add(server);
- for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
- if (isUUID(file.getPath().getName())) {
- fileToServerMap.put(file.getPath(), server);
- nameToFileMap.put(file.getPath().getName(), file.getPath());
- } else {
- log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
- }
- }
- } else if (isUUID(server)) {
- // old-style WAL are not under a directory
- servers.add("");
- fileToServerMap.put(status.getPath(), "");
- nameToFileMap.put(server, status.getPath());
- } else {
- log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
- }
- }
- return servers.size();
- }
- private Map<String,Path> getSortedWALogs() throws IOException {
- return getSortedWALogs(ServerConstants.getRecoveryDirs());
- }
/**
- * Looks for write-ahead logs in recovery directories.
+ * Scans log markers. The map passed in is populated with the logs for dead servers.
*
- * @param recoveryDirs
- * recovery directories
- * @return map of log file names to paths
+ * @param unusedLogs
+ * map of dead server to log file entries
+ * @return total number of log files
*/
- Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
- Map<String,Path> result = new HashMap<String,Path>();
-
- for (String dir : recoveryDirs) {
- Path recoveryDir = new Path(dir);
-
- if (fs.exists(recoveryDir)) {
- for (FileStatus status : fs.listStatus(recoveryDir)) {
- String name = status.getPath().getName();
- if (isUUID(name)) {
- result.put(name, status.getPath());
- } else {
- log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
+ private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
+ Set<Path> rootWALs = new HashSet<>();
+ // Get entries in zookeeper:
+ String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+ ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ List<String> children = zoo.getChildren(zpath);
+ for (String child : children) {
+ LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+ rootWALs.add(new Path(entry.filename));
+ }
+ long count = 0;
+
+ // get all the WAL markers that are not in zookeeper for dead servers
+ Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
+ rootScanner.setRange(CurrentLogsSection.getRange());
+ Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.setRange(CurrentLogsSection.getRange());
+ Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
+ Text hostAndPort = new Text();
+ Text sessionId = new Text();
+ Text filename = new Text();
+ while (entries.hasNext()) {
+ Entry<Key,Value> entry = entries.next();
+ CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
+ CurrentLogsSection.getPath(entry.getKey(), filename);
+ TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
+ Path path = new Path(filename.toString());
+ if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
+ Set<Path> logs = unusedLogs.get(tsi);
+ if (logs == null) {
+ unusedLogs.put(tsi, logs = new HashSet<Path>());
+ }
+ if (logs.add(path)) {
+ count++;
}
}
}
- return result;
- }
- /**
- * Checks if a string is a valid UUID.
- *
- * @param name
- * string to check
- * @return true if string is a UUID
- */
- static boolean isUUID(String name) {
- if (name == null || name.length() != 36) {
- return false;
- }
- try {
- UUID.fromString(name);
- return true;
- } catch (IllegalArgumentException ex) {
- return false;
+ // scan HDFS for logs for dead servers
+ for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
+ RemoteIterator<LocatedFileStatus> iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+ while (iter.hasNext()) {
+ LocatedFileStatus next = iter.next();
+ // recursive listing returns directories, too
+ if (next.isDirectory()) {
+ continue;
+ }
+ // make sure we've waited long enough for zookeeper propagation
+ if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+ continue;
+ }
+ Path path = next.getPath();
+ String hostPlusPort = path.getParent().getName();
+ // server is still alive, or has a replacement
+ TServerInstance instance = liveServers.find(hostPlusPort);
+ if (instance != null) {
+ continue;
+ }
+ TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+ Set<Path> paths = unusedLogs.get(fake);
+ if (paths == null) {
+ paths = new HashSet<>();
+ }
+ paths.add(path);
+ unusedLogs.put(fake, paths);
+ }
}
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 037023a..4f64c15 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -569,7 +569,6 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
replSpan.stop();
}
- // Clean up any unused write-ahead logs
Span waLogs = Trace.start("walogs");
try {
GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 3a32727..455aaee 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,13 +37,11 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
@@ -186,20 +184,21 @@ public class CloseWriteAheadLogReferences implements Runnable {
try {
// TODO Configurable number of threads
bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
// For each log key/value in the metadata table
for (Entry<Key,Value> entry : bs) {
- // The value may contain multiple WALs
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
+ if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
+ continue;
+ }
+ Text tpath = new Text();
+ CurrentLogsSection.getPath(entry.getKey(), tpath);
+ String path = new Path(tpath.toString()).toString();
+ log.debug("Found WAL " + path.toString());
// Normalize each log file (using Path) and add it to the set
- for (String logFile : logEntry.logSet) {
- referencedWals.add(normalizedWalPaths.get(logFile));
- }
+ referencedWals.add(normalizedWalPaths.get(path));
}
} catch (TableNotFoundException e) {
// uhhhh
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index 5801faa..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class GarbageCollectWriteAheadLogsTest {
- private static final long BLOCK_SIZE = 64000000L;
-
- private static final Path DIR_1_PATH = new Path("/dir1");
- private static final Path DIR_2_PATH = new Path("/dir2");
- private static final Path DIR_3_PATH = new Path("/dir3");
- private static final String UUID1 = UUID.randomUUID().toString();
- private static final String UUID2 = UUID.randomUUID().toString();
- private static final String UUID3 = UUID.randomUUID().toString();
-
- private Instance instance;
- private AccumuloConfiguration systemConfig;
- private VolumeManager volMgr;
- private GarbageCollectWriteAheadLogs gcwal;
- private long modTime;
-
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void setUp() throws Exception {
- SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
- instance = createMock(Instance.class);
- expect(instance.getInstanceID()).andReturn("mock").anyTimes();
- expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
- expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
- systemConfig = new ConfigurationCopy(new HashMap<String,String>());
- volMgr = createMock(VolumeManager.class);
- ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
- expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
- expect(factory.getInstance()).andReturn(instance).anyTimes();
- expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
-
- // Just make the SiteConfiguration delegate to our AccumuloConfiguration
- // Presently, we only need get(Property) and iterator().
- EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
- @Override
- public String answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.get((Property) args[0]);
- }
- }).anyTimes();
- EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
- @Override
- public Boolean answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.getBoolean((Property) args[0]);
- }
- }).anyTimes();
-
- EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
- @Override
- public Iterator<Entry<String,String>> answer() {
- return systemConfig.iterator();
- }
- }).anyTimes();
-
- replay(instance, factory, siteConfig);
- AccumuloServerContext context = new AccumuloServerContext(factory);
- gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
- modTime = System.currentTimeMillis();
- }
-
- @Test
- public void testGetters() {
- assertSame(instance, gcwal.getInstance());
- assertSame(volMgr, gcwal.getVolumeManager());
- assertFalse(gcwal.isUsingTrash());
- }
-
- @Test
- public void testPathsToStrings() {
- ArrayList<Path> paths = new ArrayList<Path>();
- paths.add(new Path(DIR_1_PATH, "file1"));
- paths.add(DIR_2_PATH);
- paths.add(new Path(DIR_3_PATH, "file3"));
- List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
- int len = 3;
- assertEquals(len, strings.size());
- for (int i = 0; i < len; i++) {
- assertEquals(paths.get(i).toString(), strings.get(i));
- }
- }
-
- @Test
- public void testMapServersToFiles() {
- // @formatter:off
- /*
- * Test fileToServerMap:
- * /dir1/server1/uuid1 -> server1 (new-style)
- * /dir1/uuid2 -> "" (old-style)
- * /dir3/server3/uuid3 -> server3 (new-style)
- */
- // @formatter:on
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
- fileToServerMap.put(path1, "server1"); // new-style
- Path path2 = new Path(DIR_1_PATH, UUID2);
- fileToServerMap.put(path2, ""); // old-style
- Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
- fileToServerMap.put(path3, "server3"); // old-style
- // @formatter:off
- /*
- * Test nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- nameToFileMap.put(UUID1, path1);
- nameToFileMap.put(UUID3, path3);
-
- // @formatter:off
- /*
- * Expected map:
- * server1 -> [ /dir1/server1/uuid1 ]
- * server3 -> [ /dir3/server3/uuid3 ]
- */
- // @formatter:on
- Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
- assertEquals(2, result.size());
- ArrayList<Path> list1 = result.get("server1");
- assertEquals(1, list1.size());
- assertTrue(list1.contains(path1));
- ArrayList<Path> list3 = result.get("server3");
- assertEquals(1, list3.size());
- assertTrue(list3.contains(path3));
- }
-
- private FileStatus makeFileStatus(int size, Path path) {
- boolean isDir = (size == 0);
- return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
- }
-
- private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
- expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
- }
-
- @Test
- public void testScanServers_NewStyle() throws Exception {
- String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * server1/
- * uuid1
- * file2
- * subdir2/
- * /dir2/ missing
- * /dir3/
- * server3/
- * uuid3
- */
- // @formatter:on
- Path serverDir1Path = new Path(DIR_1_PATH, "server1");
- FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
- Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
- FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
- mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
- Path path1 = new Path(serverDir1Path, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
- mockListStatus(serverDir1Path, file1, file2);
- mockListStatus(subDir2Path);
- expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
- Path serverDir3Path = new Path(DIR_3_PATH, "server3");
- FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
- mockListStatus(DIR_3_PATH, serverDir3);
- Path path3 = new Path(serverDir3Path, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(serverDir3Path, file3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- assertEquals(3, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/server1/uuid1 -> server1
- * /dir3/server3/uuid3 -> server3
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("server1", fileToServerMap.get(path1));
- assertEquals("server3", fileToServerMap.get(path3));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(path1, nameToFileMap.get(UUID1));
- assertEquals(path3, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testScanServers_OldStyle() throws Exception {
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * /dir3/
- * uuid3
- */
- // @formatter:on
- String[] walDirs = new String[] {"/dir1", "/dir3"};
- Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
- FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
- mockListStatus(DIR_1_PATH, serverFile1);
- Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
- FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
- mockListStatus(DIR_3_PATH, serverFile3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- /*
- * Expect only a single server, the non-server entry for upgrade WALs
- */
- assertEquals(1, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/uuid1 -> ""
- * /dir3/uuid3 -> ""
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("", fileToServerMap.get(serverFile1Path));
- assertEquals("", fileToServerMap.get(serverFile3Path));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
- assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testGetSortedWALogs() throws Exception {
- String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * file2
- * /dir2/ missing
- * /dir3/
- * uuid3
- */
- // @formatter:on
- expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
- expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
- expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
- Path path1 = new Path(DIR_1_PATH, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
- mockListStatus(DIR_1_PATH, file1, file2);
- Path path3 = new Path(DIR_3_PATH, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(DIR_3_PATH, file3);
- replay(volMgr);
-
- Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
- // @formatter:off
- /*
- * Expected map:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, sortedWalogs.size());
- assertEquals(path1, sortedWalogs.get(UUID1));
- assertEquals(path3, sortedWalogs.get(UUID3));
- }
-
- @Test
- public void testIsUUID() {
- assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
- }
-
- // It was easier to do this than get the mocking working for me
- private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
-
- private List<Entry<Key,Value>> replData;
-
- ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
- super(context, fs, useTrash);
- this.replData = replData;
- }
-
- @Override
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
- return this.replData;
- }
- }
-
- @Test
- public void replicationEntriesAffectGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
- Connector conn = createMock(Connector.class);
-
- // Write a Status record which should prevent file1 from being deleted
- LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
-
- ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
-
- replay(conn);
-
- // Open (not-closed) file must be retained
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // No replication data, not needed
- replData.clear();
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
-
- // The file is closed but not replicated, must be retained
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // File is closed and fully replicated, can be deleted
- replData.clear();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
- ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
- }
-
- @Test
- public void removeReplicationEntries() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
- Mutation m = new Mutation("/wals/" + file1);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
- m = new Mutation("/wals/" + file2);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- Connector conn = context.getConnector();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- // Write some records to the metadata table, we haven't yet written status records to the replication table
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
-
- m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats objects
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
- Entry<Key,Value> entry = Iterables.getOnlyElement(data);
-
- Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
- }
-
- @Test
- public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- long walCreateTime = System.currentTimeMillis();
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- bw = ReplicationTable.getBatchWriter(conn);
- m = new Mutation(wal);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
- Map<Key,Value> data = new HashMap<>();
- for (Entry<Key,Value> e : iter) {
- data.put(e.getKey(), e.getValue());
- }
-
- Assert.assertEquals(2, data.size());
-
- // Should get one element from each table (metadata and replication)
- for (Key k : data.keySet()) {
- String row = k.getRow().toString();
- if (row.startsWith(ReplicationSection.getRowPrefix())) {
- Assert.assertTrue(row.endsWith(wal));
- } else {
- Assert.assertEquals(wal, row);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 3115de1..78a5bd5 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -50,14 +50,12 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -130,22 +128,16 @@ public class CloseWriteAheadLogReferencesTest {
public void findOneWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
-
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ data.add(entry("tserver1:9997[1234567890]", file));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -163,54 +155,12 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
-
- verify(conn, bs);
- }
-
- @Test
- public void findManyWalFromSingleMetadata() throws Exception {
- Connector conn = createMock(Connector.class);
- BatchScanner bs = createMock(BatchScanner.class);
-
- // Fake out some data
- final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- // Multiple DFSLoggers
- logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- // Get a batchscanner, scan the tablets section, fetch only the logs
- expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
- expectLastCall().once();
- expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
- @Override
- public Iterator<Entry<Key,Value>> answer() throws Throwable {
- return data.iterator();
- }
-
- });
- // Close the bs
- bs.close();
- expectLastCall().once();
-
- replay(conn, bs);
-
- // Validate
- Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(logEntry.logSet, wals);
+ Assert.assertEquals(Collections.singleton(file), wals);
verify(conn, bs);
}
+ // This is a silly test now
@Test
public void findManyRefsToSingleWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
@@ -220,31 +170,14 @@ public class CloseWriteAheadLogReferencesTest {
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 2;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
+ data.add(entry("tserver1:9997[0123456789]", filename));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -262,7 +195,7 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+ Assert.assertEquals(Collections.singleton(filename), wals);
verify(conn, bs);
}
@@ -272,59 +205,22 @@ public class CloseWriteAheadLogReferencesTest {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
- String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
- + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
+ String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
+ String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = file1;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("5"), null, null);
- logEntry.tabletId = 2;
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
- logEntry.filename = file2;
- logEntry.server = "tserver2";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
- logEntry.tabletId = 4;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
- logEntry.filename = file3;
- logEntry.server = "tserver3";
- logEntry.tabletId = 5;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 7;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 15;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ data.add(entry("tserver1:9997[1234567890]", file1));
+ data.add(entry("tserver2:9997[1234567891]", file2));
+ data.add(entry("tserver3:9997[1234567891]", file3));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -347,6 +243,11 @@ public class CloseWriteAheadLogReferencesTest {
verify(conn, bs);
}
+ private static Entry<Key,Value> entry(String session, String file) {
+ Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
+ return Maps.immutableEntry(key, new Value());
+ }
+
@Test
public void unusedWalsAreClosed() throws Exception {
Set<String> wals = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 4d52cc5..810134b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -422,6 +422,9 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
}
perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
+
+ // add the currlog location for root tablet current logs
+ zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
haveUpgradedZooKeeper = true;
} catch (Exception ex) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 592d9ae..3f15b39 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -173,7 +173,8 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
scanner.setRange(MetadataSchema.TabletsSection.getRange());
} else {
scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
+ Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
+ scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
}
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 4c47953..20917a4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -163,6 +163,7 @@ class TabletGroupWatcher extends Daemon {
List<Assignment> assigned = new ArrayList<Assignment>();
List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+ Map<TServerInstance, List<Path>> logsForDeadServers = new TreeMap<>();
MasterState masterState = master.getMasterState();
int[] counts = new int[TabletState.values().length];
@@ -175,6 +176,7 @@ class TabletGroupWatcher extends Daemon {
if (tls == null) {
continue;
}
+ Master.log.debug(store.name() + " location State: " + tls);
// ignore entries for tables that do not exist in zookeeper
if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
continue;
@@ -184,7 +186,7 @@ class TabletGroupWatcher extends Daemon {
// Don't overwhelm the tablet servers with work
if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
assignments.clear();
assigned.clear();
assignedToDeadServers.clear();
@@ -204,8 +206,9 @@ class TabletGroupWatcher extends Daemon {
TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
TServerInstance server = tls.getServer();
TabletState state = tls.getState(currentTServers.keySet());
- if (Master.log.isTraceEnabled())
- Master.log.trace("Goal state " + goal + " current " + state);
+ if (Master.log.isTraceEnabled()) {
+ Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent);
+ }
stats.update(tableId, state);
mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
sendChopRequest(mergeStats.getMergeInfo(), state, tls);
@@ -239,7 +242,7 @@ class TabletGroupWatcher extends Daemon {
assignedToDeadServers.add(tls);
if (server.equals(this.master.migrations.get(tls.extent)))
this.master.migrations.remove(tls.extent);
- // log.info("Current servers " + currentTServers.keySet());
+ MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
break;
case UNASSIGNED:
// maybe it's a finishing migration
@@ -273,7 +276,7 @@ class TabletGroupWatcher extends Daemon {
break;
case ASSIGNED_TO_DEAD_SERVER:
assignedToDeadServers.add(tls);
- // log.info("Current servers " + currentTServers.keySet());
+ MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
break;
case HOSTED:
TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -292,7 +295,8 @@ class TabletGroupWatcher extends Daemon {
counts[state.ordinal()]++;
}
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
+ store.markLogsAsUnused(master, logsForDeadServers);
// provide stats after flushing changes to avoid race conditions w/ delete table
stats.end(masterState);
@@ -312,8 +316,12 @@ class TabletGroupWatcher extends Daemon {
updateMergeState(mergeStatsCache);
- Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
+ Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ } else {
+ Master.log.info("Detected change in current tserver set, re-running state machine.");
+ }
} catch (Exception ex) {
Master.log.error("Error processing table state for store " + store.name(), ex);
if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) {
@@ -730,12 +738,19 @@ class TabletGroupWatcher extends Daemon {
}
}
- private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
- List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+ private void flushChanges(
+ SortedMap<TServerInstance,TabletServerStatus> currentTServers,
+ List<Assignment> assignments,
+ List<Assignment> assigned,
+ List<TabletLocationState> assignedToDeadServers,
+ Map<TServerInstance, List<Path>> logsForDeadServers,
+ Map<KeyExtent,TServerInstance> unassigned)
+ throws DistributedStoreException, TException {
if (!assignedToDeadServers.isEmpty()) {
int maxServersToShow = min(assignedToDeadServers.size(), 100);
Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
- store.unassign(assignedToDeadServers);
+ Master.log.debug("logs for dead servers: " + logsForDeadServers);
+ store.unassign(assignedToDeadServers, logsForDeadServers);
this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 8532e1b..74e9b78 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,6 +107,7 @@ public class WorkMaker {
// Don't create the record if we have nothing to do.
// TODO put this into a filter on serverside
if (!shouldCreateWork(status)) {
+ log.debug("Not creating work: " + status.toString());
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index a3c7e46..f1763be 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.cli.ClientOpts;
@@ -187,7 +188,7 @@ public class MergeStats {
Text tableId = extent.getTableId();
Text first = KeyExtent.getMetadataEntry(tableId, start);
Range range = new Range(first, false, null, true);
- scanner.setRange(range);
+ scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
KeyExtent prevExtent = null;
log.debug("Scanning range " + range);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
index 6790858..3c3bc37 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.master;
-import java.util.Arrays;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
@@ -306,13 +305,7 @@ public class ReplicationOperationsImplTest {
bw.addMutation(m);
bw.close();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
- logEntry.server = "tserver";
- logEntry.filename = file1;
- logEntry.tabletId = 1;
- logEntry.logSet = Arrays.asList(file1);
- logEntry.timestamp = System.currentTimeMillis();
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
m = new Mutation(ReplicationSection.getRowPrefix() + file1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index 634ee89..8cbea68 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -186,7 +186,7 @@ public class TestMergeState {
// take it offline
m = tablet.getPrevRowUpdateMutation();
Collection<Collection<String>> walogs = Collections.emptyList();
- metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
// now we can split
stats = scan(state, metaDataStateStore);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index d2cc0cf..4002da5 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -181,7 +181,7 @@ public class RootTabletStateStoreTest {
} catch (BadLocationStateException e) {
fail("Unexpected error " + e);
}
- tstore.unassign(Collections.singletonList(assigned));
+ tstore.unassign(Collections.singletonList(assigned), null);
count = 0;
for (TabletLocationState location : tstore) {
assertEquals(location.extent, root);
@@ -209,7 +209,7 @@ public class RootTabletStateStoreTest {
fail("Unexpected error " + e);
}
try {
- tstore.unassign(Collections.singletonList(broken));
+ tstore.unassign(Collections.singletonList(broken), null);
Assert.fail("should not get here");
} catch (IllegalArgumentException ex) {}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/findbugs/exclude-filter.xml b/server/tserver/src/main/findbugs/exclude-filter.xml
index 47dd1f5..a334163 100644
--- a/server/tserver/src/main/findbugs/exclude-filter.xml
+++ b/server/tserver/src/main/findbugs/exclude-filter.xml
@@ -18,7 +18,7 @@
<Match>
<!-- locking is confusing, but probably correct -->
<Class name="org.apache.accumulo.tserver.tablet.Tablet" />
- <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,java.util.Collection,boolean" returns="boolean" />
+ <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,org.apache.accumulo.tserver.log.DfsLogger,boolean" returns="boolean" />
<Bug code="UL" pattern="UL_UNRELEASED_LOCK" />
</Match>
<Match>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
index 389a544..5fe2548 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
@@ -35,7 +35,8 @@ public class GarbageCollectionLogger {
private long gcTimeIncreasedCount = 0;
private static long lastMemoryCheckTime = 0;
- public GarbageCollectionLogger() {}
+ public GarbageCollectionLogger() {
+ }
public synchronized void logGCInfo(AccumuloConfiguration conf) {
final long now = System.currentTimeMillis();